HADOOP-14503. Make RollingAverages a mutable metric. Contributed by Hanisha Koneru.
This commit is contained in:
parent
182e98c703
commit
9529513f18
|
@ -314,6 +314,15 @@ public class MetricsRegistry {
|
|||
return rates;
|
||||
}
|
||||
|
||||
public synchronized MutableRollingAverages newMutableRollingAverages(
|
||||
String name, String valueName) {
|
||||
checkMetricName(name);
|
||||
MutableRollingAverages rollingAverages =
|
||||
new MutableRollingAverages(valueName);
|
||||
metricsMap.put(name, rollingAverages);
|
||||
return rollingAverages;
|
||||
}
|
||||
|
||||
synchronized void add(String name, MutableMetric metric) {
|
||||
checkMetricName(name);
|
||||
metricsMap.put(name, metric);
|
||||
|
|
|
@ -78,6 +78,10 @@ public class MutableMetricsFactory {
|
|||
annotation.sampleName(), annotation.valueName(),
|
||||
annotation.always());
|
||||
}
|
||||
if (cls == MutableRollingAverages.class) {
|
||||
return registry.newMutableRollingAverages(info.name(),
|
||||
annotation.valueName());
|
||||
}
|
||||
throw new MetricsException("Unsupported metric field "+ field.getName() +
|
||||
" of type "+ field.getType().getName());
|
||||
}
|
||||
|
|
|
@ -31,6 +31,7 @@ import java.util.concurrent.ScheduledExecutorService;
|
|||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
|
@ -57,29 +58,30 @@ import static org.apache.hadoop.metrics2.lib.Interns.*;
|
|||
*/
|
||||
@InterfaceAudience.Public
|
||||
@InterfaceStability.Evolving
|
||||
public class RollingAverages extends MutableMetric implements Closeable {
|
||||
public class MutableRollingAverages extends MutableMetric implements Closeable {
|
||||
|
||||
private final MutableRatesWithAggregation innerMetrics =
|
||||
private MutableRatesWithAggregation innerMetrics =
|
||||
new MutableRatesWithAggregation();
|
||||
|
||||
private static final ScheduledExecutorService SCHEDULER = Executors
|
||||
@VisibleForTesting
|
||||
static final ScheduledExecutorService SCHEDULER = Executors
|
||||
.newScheduledThreadPool(1, new ThreadFactoryBuilder().setDaemon(true)
|
||||
.setNameFormat("RollingAverages-%d").build());
|
||||
.setNameFormat("MutableRollingAverages-%d").build());
|
||||
|
||||
private ScheduledFuture<?> scheduledTask = null;
|
||||
|
||||
@Nullable
|
||||
private Map<String, MutableRate> currentSnapshot;
|
||||
|
||||
private final int numWindows;
|
||||
private final String avgInfoNameTemplate;
|
||||
private final String avgInfoDescTemplate;
|
||||
private int numWindows;
|
||||
|
||||
private static class SumAndCount {
|
||||
private final double sum;
|
||||
private final long count;
|
||||
|
||||
public SumAndCount(final double sum, final long count) {
|
||||
SumAndCount(final double sum, final long count) {
|
||||
this.sum = sum;
|
||||
this.count = count;
|
||||
}
|
||||
|
@ -105,44 +107,36 @@ public class RollingAverages extends MutableMetric implements Closeable {
|
|||
private ConcurrentMap<String, LinkedBlockingDeque<SumAndCount>> averages =
|
||||
new ConcurrentHashMap<>();
|
||||
|
||||
private static final long WINDOW_SIZE_MS_DEFAULT = 300_000;
|
||||
private static final int NUM_WINDOWS_DEFAULT = 36;
|
||||
|
||||
/**
|
||||
* Constructor of {@link RollingAverages}.
|
||||
* @param windowSizeMs
|
||||
* The number of milliseconds of each window for which subset
|
||||
* of samples are gathered to compute the rolling average, A.K.A.
|
||||
* roll over interval.
|
||||
* @param numWindows
|
||||
* The number of windows maintained to compute the rolling average.
|
||||
* @param valueName
|
||||
* of the metric (e.g. "Time", "Latency")
|
||||
* Constructor for {@link MutableRollingAverages}.
|
||||
* @param metricValueName
|
||||
*/
|
||||
public RollingAverages(
|
||||
final long windowSizeMs,
|
||||
final int numWindows,
|
||||
final String valueName) {
|
||||
String uvName = StringUtils.capitalize(valueName);
|
||||
String lvName = StringUtils.uncapitalize(valueName);
|
||||
avgInfoNameTemplate = "[%s]" + "RollingAvg"+ uvName;
|
||||
avgInfoDescTemplate = "Rolling average "+ lvName +" for "+ "%s";
|
||||
this.numWindows = numWindows;
|
||||
public MutableRollingAverages(String metricValueName) {
|
||||
if (metricValueName == null) {
|
||||
metricValueName = "";
|
||||
}
|
||||
avgInfoNameTemplate = "[%s]" + "RollingAvg" +
|
||||
StringUtils.capitalize(metricValueName);
|
||||
avgInfoDescTemplate = "Rolling average " +
|
||||
StringUtils.uncapitalize(metricValueName) +" for "+ "%s";
|
||||
numWindows = NUM_WINDOWS_DEFAULT;
|
||||
scheduledTask = SCHEDULER.scheduleAtFixedRate(new RatesRoller(this),
|
||||
windowSizeMs, windowSizeMs, TimeUnit.MILLISECONDS);
|
||||
WINDOW_SIZE_MS_DEFAULT, WINDOW_SIZE_MS_DEFAULT, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructor of {@link RollingAverages}.
|
||||
* @param windowSizeMs
|
||||
* The number of seconds of each window for which sub set of samples
|
||||
* are gathered to compute rolling average, also A.K.A roll over
|
||||
* interval.
|
||||
* @param numWindows
|
||||
* The number of windows maintained in the same time to compute the
|
||||
* average of the rolling averages.
|
||||
* This method is for testing only to replace the scheduledTask.
|
||||
*/
|
||||
public RollingAverages(
|
||||
final long windowSizeMs,
|
||||
final int numWindows) {
|
||||
this(windowSizeMs, numWindows, "Time");
|
||||
@VisibleForTesting
|
||||
synchronized void replaceScheduledTask(int windows, long interval,
|
||||
TimeUnit timeUnit) {
|
||||
numWindows = windows;
|
||||
scheduledTask.cancel(true);
|
||||
scheduledTask = SCHEDULER.scheduleAtFixedRate(new RatesRoller(this),
|
||||
interval, interval, timeUnit);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -190,9 +184,9 @@ public class RollingAverages extends MutableMetric implements Closeable {
|
|||
}
|
||||
|
||||
private static class RatesRoller implements Runnable {
|
||||
private final RollingAverages parent;
|
||||
private final MutableRollingAverages parent;
|
||||
|
||||
public RatesRoller(final RollingAverages parent) {
|
||||
RatesRoller(final MutableRollingAverages parent) {
|
||||
this.parent = parent;
|
||||
}
|
||||
|
||||
|
@ -218,7 +212,7 @@ public class RollingAverages extends MutableMetric implements Closeable {
|
|||
|
||||
/**
|
||||
* Iterates over snapshot to capture all Avg metrics into rolling structure
|
||||
* {@link RollingAverages#averages}.
|
||||
* {@link MutableRollingAverages#averages}.
|
||||
* <p>
|
||||
* This function is not thread safe, callers should ensure thread safety.
|
||||
* </p>
|
|
@ -0,0 +1,49 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.metrics2.lib;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* A helper class that can provide test cases access to package-private
|
||||
* methods.
|
||||
*/
|
||||
public final class MetricsTestHelper {
|
||||
public static final Logger LOG =
|
||||
LoggerFactory.getLogger(MetricsTestHelper.class);
|
||||
|
||||
private MetricsTestHelper() {
|
||||
//not called
|
||||
}
|
||||
|
||||
/**
|
||||
* Replace the rolling averages windows for a
|
||||
* {@link MutableRollingAverages} metric.
|
||||
*
|
||||
*/
|
||||
public static void replaceRollingAveragesScheduler(
|
||||
MutableRollingAverages mutableRollingAverages,
|
||||
int numWindows, long interval, TimeUnit timeUnit) {
|
||||
mutableRollingAverages.replaceScheduledTask(
|
||||
numWindows, interval, timeUnit);
|
||||
}
|
||||
}
|
|
@ -17,23 +17,30 @@
|
|||
*/
|
||||
package org.apache.hadoop.metrics2.lib;
|
||||
|
||||
import com.google.common.base.Supplier;
|
||||
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
|
||||
import org.apache.hadoop.metrics2.annotation.Metric;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
|
||||
import org.apache.hadoop.util.Time;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.apache.hadoop.metrics2.lib.Interns.info;
|
||||
import static org.apache.hadoop.test.MetricsAsserts.mockMetricsRecordBuilder;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.never;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.apache.hadoop.test.MetricsAsserts.*;
|
||||
import static org.mockito.Matchers.anyDouble;
|
||||
import static org.mockito.Matchers.eq;
|
||||
|
||||
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
|
||||
import org.apache.hadoop.util.Time;
|
||||
import org.junit.Test;
|
||||
import static org.mockito.Mockito.*;
|
||||
|
||||
/**
|
||||
* This class tests various cases of the algorithms implemented in
|
||||
* {@link RollingAverages}.
|
||||
* {@link MutableRollingAverages}.
|
||||
*/
|
||||
public class TestRollingAverages {
|
||||
public class TestMutableRollingAverages {
|
||||
|
||||
/**
|
||||
* Tests if the results are correct if no samples are inserted, dry run of
|
||||
* empty roll over.
|
||||
|
@ -42,8 +49,9 @@ public class TestRollingAverages {
|
|||
public void testRollingAveragesEmptyRollover() throws Exception {
|
||||
final MetricsRecordBuilder rb = mockMetricsRecordBuilder();
|
||||
/* 5s interval and 2 windows */
|
||||
try (final RollingAverages rollingAverages =
|
||||
new RollingAverages(5000, 2)) {
|
||||
try (MutableRollingAverages rollingAverages =
|
||||
new MutableRollingAverages("Time")) {
|
||||
rollingAverages.replaceScheduledTask(2, 5, TimeUnit.SECONDS);
|
||||
/* Check it initially */
|
||||
rollingAverages.snapshot(rb, true);
|
||||
verify(rb, never()).addGauge(
|
||||
|
@ -76,11 +84,10 @@ public class TestRollingAverages {
|
|||
final MetricsRecordBuilder rb = mockMetricsRecordBuilder();
|
||||
final String name = "foo2";
|
||||
final int windowSizeMs = 5000; // 5s roll over interval
|
||||
final int numWindows = 2;
|
||||
final int numOpsPerIteration = 1000;
|
||||
try (RollingAverages rollingAverages = new RollingAverages(windowSizeMs,
|
||||
numWindows)) {
|
||||
|
||||
try (MutableRollingAverages rollingAverages =
|
||||
new MutableRollingAverages("Time")) {
|
||||
rollingAverages.replaceScheduledTask(2, 5000, TimeUnit.MILLISECONDS);
|
||||
/* Push values for three intervals */
|
||||
final long start = Time.monotonicNow();
|
||||
for (int i = 1; i <= 3; i++) {
|
||||
|
@ -121,4 +128,64 @@ public class TestRollingAverages {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that MutableRollingAverages gives expected results after
|
||||
* initialization.
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test(timeout = 30000)
|
||||
public void testMutableRollingAveragesMetric() throws Exception {
|
||||
final DummyTestMetric testMetric = new DummyTestMetric();
|
||||
testMetric.create();
|
||||
|
||||
testMetric.add("metric1", 100);
|
||||
testMetric.add("metric1", 900);
|
||||
testMetric.add("metric2", 1000);
|
||||
testMetric.add("metric2", 1000);
|
||||
|
||||
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||
@Override
|
||||
public Boolean get() {
|
||||
testMetric.collectThreadLocalStates();
|
||||
return testMetric.getStats().size() > 0;
|
||||
}
|
||||
}, 500, 5000);
|
||||
|
||||
MetricsRecordBuilder rb = getMetrics(DummyTestMetric.METRIC_NAME);
|
||||
|
||||
double metric1Avg = getDoubleGauge("[Metric1]RollingAvgTesting", rb);
|
||||
double metric2Avg = getDoubleGauge("[Metric2]RollingAvgTesting", rb);
|
||||
Assert.assertTrue("The rolling average of metric1 is not as expected",
|
||||
metric1Avg == 500.0);
|
||||
Assert.assertTrue("The rolling average of metric2 is not as expected",
|
||||
metric2Avg == 1000.0);
|
||||
|
||||
}
|
||||
|
||||
class DummyTestMetric {
|
||||
@Metric (valueName = "testing")
|
||||
private MutableRollingAverages rollingAverages;
|
||||
|
||||
static final String METRIC_NAME = "RollingAveragesTestMetric";
|
||||
|
||||
protected void create() {
|
||||
DefaultMetricsSystem.instance().register(METRIC_NAME,
|
||||
"mutable rolling averages test", this);
|
||||
rollingAverages.replaceScheduledTask(10, 1000, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
void add(String name, long latency) {
|
||||
rollingAverages.add(name, latency);
|
||||
}
|
||||
|
||||
void collectThreadLocalStates() {
|
||||
rollingAverages.collectThreadLocalStates();
|
||||
}
|
||||
|
||||
Map<String, Double> getStats() {
|
||||
return rollingAverages.getStats(0);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
|
@ -434,18 +434,6 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
|||
HdfsClientConfigKeys.DeprecatedKeys.DFS_METRICS_SESSION_ID_KEY;
|
||||
public static final String DFS_METRICS_PERCENTILES_INTERVALS_KEY = "dfs.metrics.percentiles.intervals";
|
||||
|
||||
// The following setting is not meant to be changed by administrators.
|
||||
public static final String DFS_METRICS_ROLLING_AVERAGES_WINDOW_LENGTH_KEY =
|
||||
"dfs.metrics.rolling.averages.window.length";
|
||||
public static final int DFS_METRICS_ROLLING_AVERAGES_WINDOW_LENGTH_DEFAULT =
|
||||
300 * 1000;
|
||||
|
||||
// The following setting is not meant to be changed by administrators.
|
||||
public static final String DFS_METRICS_ROLLING_AVERAGE_NUM_WINDOWS_KEY =
|
||||
"dfs.metrics.rolling.average.num.windows";
|
||||
public static final int DFS_METRICS_ROLLING_AVERAGE_NUM_WINDOWS_DEFAULT =
|
||||
36;
|
||||
|
||||
public static final String DFS_DATANODE_PEER_STATS_ENABLED_KEY =
|
||||
"dfs.datanode.peer.stats.enabled";
|
||||
public static final boolean DFS_DATANODE_PEER_STATS_ENABLED_DEFAULT = false;
|
||||
|
|
|
@ -1383,7 +1383,7 @@ public class DataNode extends ReconfigurableBase
|
|||
|
||||
metrics = DataNodeMetrics.create(getConf(), getDisplayName());
|
||||
peerMetrics = dnConf.peerStatsEnabled ?
|
||||
DataNodePeerMetrics.create(getConf(), getDisplayName()) : null;
|
||||
DataNodePeerMetrics.create(getDisplayName()) : null;
|
||||
metrics.getJvmMetrics().setPauseMonitor(pauseMonitor);
|
||||
|
||||
blockRecoveryWorker = new BlockRecoveryWorker(this);
|
||||
|
|
|
@ -22,16 +22,13 @@ package org.apache.hadoop.hdfs.server.datanode.metrics;
|
|||
import com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.metrics2.MetricsJsonBuilder;
|
||||
import org.apache.hadoop.metrics2.lib.RollingAverages;
|
||||
import org.apache.hadoop.metrics2.lib.MutableRollingAverages;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* This class maintains DataNode peer metrics (e.g. numOps, AvgTime, etc.) for
|
||||
|
@ -44,7 +41,7 @@ public class DataNodePeerMetrics {
|
|||
public static final Logger LOG = LoggerFactory.getLogger(
|
||||
DataNodePeerMetrics.class);
|
||||
|
||||
private final RollingAverages sendPacketDownstreamRollingAvgerages;
|
||||
private final MutableRollingAverages sendPacketDownstreamRollingAverages;
|
||||
|
||||
private final String name;
|
||||
|
||||
|
@ -64,15 +61,11 @@ public class DataNodePeerMetrics {
|
|||
@VisibleForTesting
|
||||
static final long MIN_OUTLIER_DETECTION_SAMPLES = 1000;
|
||||
|
||||
public DataNodePeerMetrics(
|
||||
final String name,
|
||||
final long windowSizeMs,
|
||||
final int numWindows) {
|
||||
public DataNodePeerMetrics(final String name) {
|
||||
this.name = name;
|
||||
this.slowNodeDetector = new OutlierDetector(MIN_OUTLIER_DETECTION_NODES,
|
||||
LOW_THRESHOLD_MS);
|
||||
sendPacketDownstreamRollingAvgerages = new RollingAverages(
|
||||
windowSizeMs, numWindows);
|
||||
sendPacketDownstreamRollingAverages = new MutableRollingAverages("Time");
|
||||
}
|
||||
|
||||
public String name() {
|
||||
|
@ -82,23 +75,12 @@ public class DataNodePeerMetrics {
|
|||
/**
|
||||
* Creates an instance of DataNodePeerMetrics, used for registration.
|
||||
*/
|
||||
public static DataNodePeerMetrics create(Configuration conf, String dnName) {
|
||||
public static DataNodePeerMetrics create(String dnName) {
|
||||
final String name = "DataNodePeerActivity-" + (dnName.isEmpty()
|
||||
? "UndefinedDataNodeName" + ThreadLocalRandom.current().nextInt()
|
||||
: dnName.replace(':', '-'));
|
||||
|
||||
final long windowSizeMs = conf.getTimeDuration(
|
||||
DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGES_WINDOW_LENGTH_KEY,
|
||||
DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGES_WINDOW_LENGTH_DEFAULT,
|
||||
TimeUnit.MILLISECONDS);
|
||||
final int numWindows = conf.getInt(
|
||||
DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGE_NUM_WINDOWS_KEY,
|
||||
DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGE_NUM_WINDOWS_DEFAULT);
|
||||
|
||||
return new DataNodePeerMetrics(
|
||||
name,
|
||||
windowSizeMs,
|
||||
numWindows);
|
||||
return new DataNodePeerMetrics(name);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -112,7 +94,7 @@ public class DataNodePeerMetrics {
|
|||
public void addSendPacketDownstream(
|
||||
final String peerAddr,
|
||||
final long elapsedMs) {
|
||||
sendPacketDownstreamRollingAvgerages.add(peerAddr, elapsedMs);
|
||||
sendPacketDownstreamRollingAverages.add(peerAddr, elapsedMs);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -120,7 +102,7 @@ public class DataNodePeerMetrics {
|
|||
*/
|
||||
public String dumpSendPacketDownstreamAvgInfoAsJson() {
|
||||
final MetricsJsonBuilder builder = new MetricsJsonBuilder(null);
|
||||
sendPacketDownstreamRollingAvgerages.snapshot(builder, true);
|
||||
sendPacketDownstreamRollingAverages.snapshot(builder, true);
|
||||
return builder.toString();
|
||||
}
|
||||
|
||||
|
@ -128,7 +110,7 @@ public class DataNodePeerMetrics {
|
|||
* Collects states maintained in {@link ThreadLocal}, if any.
|
||||
*/
|
||||
public void collectThreadLocalStates() {
|
||||
sendPacketDownstreamRollingAvgerages.collectThreadLocalStates();
|
||||
sendPacketDownstreamRollingAverages.collectThreadLocalStates();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -139,10 +121,14 @@ public class DataNodePeerMetrics {
|
|||
// This maps the metric name to the aggregate latency.
|
||||
// The metric name is the datanode ID.
|
||||
final Map<String, Double> stats =
|
||||
sendPacketDownstreamRollingAvgerages.getStats(
|
||||
sendPacketDownstreamRollingAverages.getStats(
|
||||
MIN_OUTLIER_DETECTION_SAMPLES);
|
||||
LOG.trace("DataNodePeerMetrics: Got stats: {}", stats);
|
||||
|
||||
return slowNodeDetector.getOutliers(stats);
|
||||
}
|
||||
|
||||
public MutableRollingAverages getSendPacketDownstreamRollingAverages() {
|
||||
return sendPacketDownstreamRollingAverages;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit;
|
|||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodePeerMetrics;
|
||||
import org.apache.hadoop.metrics2.lib.MetricsTestHelper;
|
||||
import org.apache.hadoop.util.Time;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.junit.Test;
|
||||
|
@ -42,16 +43,13 @@ public class TestDataNodePeerMetrics {
|
|||
final int numOpsPerIteration = 1000;
|
||||
|
||||
final Configuration conf = new HdfsConfiguration();
|
||||
conf.setTimeDuration(
|
||||
DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGES_WINDOW_LENGTH_KEY,
|
||||
windowSize, TimeUnit.SECONDS);
|
||||
conf.setInt(DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGE_NUM_WINDOWS_KEY,
|
||||
numWindows);
|
||||
conf.setBoolean(DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY, true);
|
||||
|
||||
final DataNodePeerMetrics peerMetrics = DataNodePeerMetrics.create(
|
||||
conf,
|
||||
"Sample-DataNode");
|
||||
MetricsTestHelper.replaceRollingAveragesScheduler(
|
||||
peerMetrics.getSendPacketDownstreamRollingAverages(),
|
||||
numWindows, windowSize, TimeUnit.SECONDS);
|
||||
final long start = Time.monotonicNow();
|
||||
for (int i = 1; i <= iterations; i++) {
|
||||
final String peerAddr = genPeerAddress();
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
package org.apache.hadoop.hdfs.server.datanode.metrics;
|
||||
|
||||
import com.google.common.base.Supplier;
|
||||
import org.apache.hadoop.metrics2.lib.MetricsTestHelper;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.log4j.Level;
|
||||
import org.junit.Before;
|
||||
|
@ -30,6 +31,7 @@ import org.slf4j.LoggerFactory;
|
|||
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.hamcrest.core.Is.is;
|
||||
import static org.junit.Assert.assertThat;
|
||||
|
@ -73,8 +75,12 @@ public class TestDataNodeOutlierDetectionViaMetrics {
|
|||
final String slowNodeName = "SlowNode";
|
||||
|
||||
final DataNodePeerMetrics peerMetrics = new DataNodePeerMetrics(
|
||||
"PeerMetrics-For-Test", WINDOW_INTERVAL_SECONDS,
|
||||
ROLLING_AVERAGE_WINDOWS);
|
||||
"PeerMetrics-For-Test");
|
||||
|
||||
MetricsTestHelper.replaceRollingAveragesScheduler(
|
||||
peerMetrics.getSendPacketDownstreamRollingAverages(),
|
||||
ROLLING_AVERAGE_WINDOWS,
|
||||
WINDOW_INTERVAL_SECONDS, TimeUnit.SECONDS);
|
||||
|
||||
injectFastNodesSamples(peerMetrics);
|
||||
injectSlowNodeSamples(peerMetrics, slowNodeName);
|
||||
|
@ -101,8 +107,12 @@ public class TestDataNodeOutlierDetectionViaMetrics {
|
|||
@Test
|
||||
public void testWithNoOutliers() throws Exception {
|
||||
DataNodePeerMetrics peerMetrics = new DataNodePeerMetrics(
|
||||
"PeerMetrics-For-Test", WINDOW_INTERVAL_SECONDS,
|
||||
ROLLING_AVERAGE_WINDOWS);
|
||||
"PeerMetrics-For-Test");
|
||||
|
||||
MetricsTestHelper.replaceRollingAveragesScheduler(
|
||||
peerMetrics.getSendPacketDownstreamRollingAverages(),
|
||||
ROLLING_AVERAGE_WINDOWS,
|
||||
WINDOW_INTERVAL_SECONDS, TimeUnit.SECONDS);
|
||||
|
||||
injectFastNodesSamples(peerMetrics);
|
||||
|
||||
|
|
|
@ -68,10 +68,6 @@ public class TestHdfsConfigFields extends TestConfigurationFieldsBase {
|
|||
// Purposely hidden, based on comments in DFSConfigKeys
|
||||
configurationPropsToSkipCompare
|
||||
.add(DFSConfigKeys.DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_KEY);
|
||||
configurationPropsToSkipCompare
|
||||
.add(DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGES_WINDOW_LENGTH_KEY);
|
||||
configurationPropsToSkipCompare
|
||||
.add(DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGE_NUM_WINDOWS_KEY);
|
||||
|
||||
// Fully deprecated properties?
|
||||
configurationPropsToSkipCompare
|
||||
|
|
Loading…
Reference in New Issue