HADOOP-15008. Fixed period unit calculation for Hadoop Metrics V2. (Contribute by Erik Krogen)

This commit is contained in:
Eric Yang 2017-11-13 12:40:45 -05:00
parent 975a57a688
commit 1b68b8ff2c
3 changed files with 61 additions and 7 deletions

View File

@ -51,7 +51,7 @@ class MetricsSinkAdapter implements SinkQueue.Consumer<MetricsBuffer> {
private final Thread sinkThread;
private volatile boolean stopping = false;
private volatile boolean inError = false;
private final int period, firstRetryDelay, retryCount;
private final int periodMs, firstRetryDelay, retryCount;
private final long oobPutTimeout;
private final float retryBackoff;
private final MetricsRegistry registry = new MetricsRegistry("sinkadapter");
@ -62,7 +62,7 @@ class MetricsSinkAdapter implements SinkQueue.Consumer<MetricsBuffer> {
MetricsSinkAdapter(String name, String description, MetricsSink sink,
String context, MetricsFilter sourceFilter,
MetricsFilter recordFilter, MetricsFilter metricFilter,
int period, int queueCapacity, int retryDelay,
int periodMs, int queueCapacity, int retryDelay,
float retryBackoff, int retryCount) {
this.name = checkNotNull(name, "name");
this.description = description;
@ -71,7 +71,7 @@ class MetricsSinkAdapter implements SinkQueue.Consumer<MetricsBuffer> {
this.sourceFilter = sourceFilter;
this.recordFilter = recordFilter;
this.metricFilter = metricFilter;
this.period = checkArg(period, period > 0, "period");
this.periodMs = checkArg(periodMs, periodMs > 0, "period");
firstRetryDelay = checkArg(retryDelay, retryDelay > 0, "retry delay");
this.retryBackoff = checkArg(retryBackoff, retryBackoff>1, "retry backoff");
oobPutTimeout = (long)
@ -93,9 +93,9 @@ class MetricsSinkAdapter implements SinkQueue.Consumer<MetricsBuffer> {
sinkThread.setDaemon(true);
}
boolean putMetrics(MetricsBuffer buffer, long logicalTime) {
if (logicalTime % period == 0) {
LOG.debug("enqueue, logicalTime="+ logicalTime);
boolean putMetrics(MetricsBuffer buffer, long logicalTimeMs) {
if (logicalTimeMs % periodMs == 0) {
LOG.debug("enqueue, logicalTime="+ logicalTimeMs);
if (queue.enqueue(buffer)) {
refreshQueueSizeGauge();
return true;

View File

@ -519,7 +519,7 @@ public class MetricsSystemImpl extends MetricsSystem implements MetricsSource {
conf.getFilter(SOURCE_FILTER_KEY),
conf.getFilter(RECORD_FILTER_KEY),
conf.getFilter(METRIC_FILTER_KEY),
conf.getInt(PERIOD_KEY, PERIOD_DEFAULT),
conf.getInt(PERIOD_KEY, PERIOD_DEFAULT) * 1000,
conf.getInt(QUEUE_CAPACITY_KEY, QUEUE_CAPACITY_DEFAULT),
conf.getInt(RETRY_DELAY_KEY, RETRY_DELAY_DEFAULT),
conf.getFloat(RETRY_BACKOFF_KEY, RETRY_BACKOFF_DEFAULT),
@ -618,6 +618,11 @@ public class MetricsSystemImpl extends MetricsSystem implements MetricsSource {
return sources.get(name);
}
@VisibleForTesting
public MetricsSinkAdapter getSinkAdapter(String name) {
return sinks.get(name);
}
private InitMode initMode() {
LOG.debug("from system property: "+ System.getProperty(MS_INIT_MODE_KEY));
LOG.debug("from environment variable: "+ System.getenv(MS_INIT_MODE_KEY));

View File

@ -39,10 +39,12 @@ import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
import com.google.common.base.Predicate;
import com.google.common.base.Supplier;
import com.google.common.collect.Iterables;
import org.apache.commons.configuration2.SubsetConfiguration;
import org.apache.hadoop.metrics2.MetricsException;
import org.apache.hadoop.test.GenericTestUtils;
import static org.apache.hadoop.test.MoreAsserts.*;
import org.apache.hadoop.metrics2.AbstractMetric;
@ -78,8 +80,11 @@ public class TestMetricsSystemImpl {
public static class TestSink implements MetricsSink {
private List<Iterable<AbstractMetric>> metricValues = new ArrayList<>();
@Override public void putMetrics(MetricsRecord record) {
LOG.debug(record.toString());
metricValues.add(record.metrics());
}
@Override public void flush() {}
@ -87,6 +92,10 @@ public class TestMetricsSystemImpl {
@Override public void init(SubsetConfiguration conf) {
LOG.debug(MetricsConfig.toString(conf));
}
List<Iterable<AbstractMetric>> getMetricValues() {
return metricValues;
}
}
@Test public void testInitFirstVerifyStopInvokedImmediately() throws Exception {
@ -559,6 +568,46 @@ public class TestMetricsSystemImpl {
ms.shutdown();
}
@Test
public void testRegisterSinksMultiplePeriods() throws Exception {
new ConfigBuilder().add("test.sink.test1.period", 100000)
.add("test.sink.test1.class", TestSink.class.getName())
.add("test.sink.test2.period", 200000)
.add("test.sink.test2.class", TestSink.class.getName())
.save(TestMetricsConfig.getTestFilename("hadoop-metrics2-test"));
MetricsSystemImpl ms = new MetricsSystemImpl();
try {
ms.init("test");
TestSink sink1 = (TestSink) ms.getSinkAdapter("test1").sink();
TestSink sink2 = (TestSink) ms.getSinkAdapter("test2").sink();
assertEquals(0, sink1.getMetricValues().size());
assertEquals(0, sink2.getMetricValues().size());
ms.onTimerEvent();
// Give some time for the publish event to go through
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
return sink1.getMetricValues().size() > 0;
}
}, 10, 10000);
assertEquals(1, sink1.getMetricValues().size());
assertEquals(0, sink2.getMetricValues().size());
ms.onTimerEvent();
// Give some time for the publish event to go through
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
return sink1.getMetricValues().size() > 1 &&
sink2.getMetricValues().size() > 0;
}
}, 10, 10000);
assertEquals(2, sink1.getMetricValues().size());
assertEquals(1, sink2.getMetricValues().size());
} finally {
ms.shutdown();
}
}
@Metrics(context="test")
private static class TestSource {
@Metric("C1 desc") MutableCounterLong c1;