diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSinkAdapter.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSinkAdapter.java index 1199ebd6fdd..f2e607b5776 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSinkAdapter.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSinkAdapter.java @@ -51,7 +51,7 @@ class MetricsSinkAdapter implements SinkQueue.Consumer { private final Thread sinkThread; private volatile boolean stopping = false; private volatile boolean inError = false; - private final int period, firstRetryDelay, retryCount; + private final int periodMs, firstRetryDelay, retryCount; private final long oobPutTimeout; private final float retryBackoff; private final MetricsRegistry registry = new MetricsRegistry("sinkadapter"); @@ -62,7 +62,7 @@ class MetricsSinkAdapter implements SinkQueue.Consumer { MetricsSinkAdapter(String name, String description, MetricsSink sink, String context, MetricsFilter sourceFilter, MetricsFilter recordFilter, MetricsFilter metricFilter, - int period, int queueCapacity, int retryDelay, + int periodMs, int queueCapacity, int retryDelay, float retryBackoff, int retryCount) { this.name = checkNotNull(name, "name"); this.description = description; @@ -71,7 +71,7 @@ class MetricsSinkAdapter implements SinkQueue.Consumer { this.sourceFilter = sourceFilter; this.recordFilter = recordFilter; this.metricFilter = metricFilter; - this.period = checkArg(period, period > 0, "period"); + this.periodMs = checkArg(periodMs, periodMs > 0, "period"); firstRetryDelay = checkArg(retryDelay, retryDelay > 0, "retry delay"); this.retryBackoff = checkArg(retryBackoff, retryBackoff>1, "retry backoff"); oobPutTimeout = (long) @@ -93,9 +93,9 @@ class MetricsSinkAdapter implements SinkQueue.Consumer { sinkThread.setDaemon(true); } - boolean putMetrics(MetricsBuffer buffer, long logicalTime) { - if (logicalTime % period == 0) { - LOG.debug("enqueue, logicalTime="+ logicalTime); + boolean putMetrics(MetricsBuffer buffer, long logicalTimeMs) { + if (logicalTimeMs % periodMs == 0) { + LOG.debug("enqueue, logicalTime="+ logicalTimeMs); if (queue.enqueue(buffer)) { refreshQueueSizeGauge(); return true; diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java index ee1672ef0c3..624edc96b8a 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java @@ -519,7 +519,7 @@ public class MetricsSystemImpl extends MetricsSystem implements MetricsSource { conf.getFilter(SOURCE_FILTER_KEY), conf.getFilter(RECORD_FILTER_KEY), conf.getFilter(METRIC_FILTER_KEY), - conf.getInt(PERIOD_KEY, PERIOD_DEFAULT), + conf.getInt(PERIOD_KEY, PERIOD_DEFAULT) * 1000, conf.getInt(QUEUE_CAPACITY_KEY, QUEUE_CAPACITY_DEFAULT), conf.getInt(RETRY_DELAY_KEY, RETRY_DELAY_DEFAULT), conf.getFloat(RETRY_BACKOFF_KEY, RETRY_BACKOFF_DEFAULT), @@ -618,6 +618,11 @@ public class MetricsSystemImpl extends MetricsSystem implements MetricsSource { return sources.get(name); } + @VisibleForTesting + public MetricsSinkAdapter getSinkAdapter(String name) { + return sinks.get(name); + } + private InitMode initMode() { LOG.debug("from system property: "+ System.getProperty(MS_INIT_MODE_KEY)); LOG.debug("from environment variable: "+ System.getenv(MS_INIT_MODE_KEY)); diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestMetricsSystemImpl.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestMetricsSystemImpl.java index abd1b132b0a..f3a2553511a 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestMetricsSystemImpl.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestMetricsSystemImpl.java @@ -39,10 +39,12 @@ import static org.junit.Assert.*; import static org.mockito.Mockito.*; import com.google.common.base.Predicate; +import com.google.common.base.Supplier; import com.google.common.collect.Iterables; import org.apache.commons.configuration2.SubsetConfiguration; import org.apache.hadoop.metrics2.MetricsException; +import org.apache.hadoop.test.GenericTestUtils; import static org.apache.hadoop.test.MoreAsserts.*; import org.apache.hadoop.metrics2.AbstractMetric; @@ -78,8 +80,11 @@ public class TestMetricsSystemImpl { public static class TestSink implements MetricsSink { + private List> metricValues = new ArrayList<>(); + @Override public void putMetrics(MetricsRecord record) { LOG.debug(record.toString()); + metricValues.add(record.metrics()); } @Override public void flush() {} @@ -87,6 +92,10 @@ public class TestMetricsSystemImpl { @Override public void init(SubsetConfiguration conf) { LOG.debug(MetricsConfig.toString(conf)); } + + List> getMetricValues() { + return metricValues; + } } @Test public void testInitFirstVerifyStopInvokedImmediately() throws Exception { @@ -559,6 +568,46 @@ public class TestMetricsSystemImpl { ms.shutdown(); } + @Test + public void testRegisterSinksMultiplePeriods() throws Exception { + new ConfigBuilder().add("test.sink.test1.period", 100000) + .add("test.sink.test1.class", TestSink.class.getName()) + .add("test.sink.test2.period", 200000) + .add("test.sink.test2.class", TestSink.class.getName()) + .save(TestMetricsConfig.getTestFilename("hadoop-metrics2-test")); + MetricsSystemImpl ms = new MetricsSystemImpl(); + try { + ms.init("test"); + TestSink sink1 = (TestSink) ms.getSinkAdapter("test1").sink(); + TestSink sink2 = (TestSink) ms.getSinkAdapter("test2").sink(); + assertEquals(0, sink1.getMetricValues().size()); + assertEquals(0, sink2.getMetricValues().size()); + ms.onTimerEvent(); + // Give some time for the publish event to go through + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + return sink1.getMetricValues().size() > 0; + } + }, 10, 10000); + assertEquals(1, sink1.getMetricValues().size()); + assertEquals(0, sink2.getMetricValues().size()); + ms.onTimerEvent(); + // Give some time for the publish event to go through + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + return sink1.getMetricValues().size() > 1 && + sink2.getMetricValues().size() > 0; + } + }, 10, 10000); + assertEquals(2, sink1.getMetricValues().size()); + assertEquals(1, sink2.getMetricValues().size()); + } finally { + ms.shutdown(); + } + } + @Metrics(context="test") private static class TestSource { @Metric("C1 desc") MutableCounterLong c1;