HADOOP-15008. Fixed period unit calculation for Hadoop Metrics V2.

This commit is contained in:
Eric Yang 2017-11-13 12:40:45 -05:00
parent 975a57a688
commit 782681c73e
3 changed files with 61 additions and 7 deletions

View File

@ -51,7 +51,7 @@ class MetricsSinkAdapter implements SinkQueue.Consumer<MetricsBuffer> {
private final Thread sinkThread; private final Thread sinkThread;
private volatile boolean stopping = false; private volatile boolean stopping = false;
private volatile boolean inError = false; private volatile boolean inError = false;
private final int period, firstRetryDelay, retryCount; private final int periodMs, firstRetryDelay, retryCount;
private final long oobPutTimeout; private final long oobPutTimeout;
private final float retryBackoff; private final float retryBackoff;
private final MetricsRegistry registry = new MetricsRegistry("sinkadapter"); private final MetricsRegistry registry = new MetricsRegistry("sinkadapter");
@ -62,7 +62,7 @@ class MetricsSinkAdapter implements SinkQueue.Consumer<MetricsBuffer> {
MetricsSinkAdapter(String name, String description, MetricsSink sink, MetricsSinkAdapter(String name, String description, MetricsSink sink,
String context, MetricsFilter sourceFilter, String context, MetricsFilter sourceFilter,
MetricsFilter recordFilter, MetricsFilter metricFilter, MetricsFilter recordFilter, MetricsFilter metricFilter,
int period, int queueCapacity, int retryDelay, int periodMs, int queueCapacity, int retryDelay,
float retryBackoff, int retryCount) { float retryBackoff, int retryCount) {
this.name = checkNotNull(name, "name"); this.name = checkNotNull(name, "name");
this.description = description; this.description = description;
@ -71,7 +71,7 @@ class MetricsSinkAdapter implements SinkQueue.Consumer<MetricsBuffer> {
this.sourceFilter = sourceFilter; this.sourceFilter = sourceFilter;
this.recordFilter = recordFilter; this.recordFilter = recordFilter;
this.metricFilter = metricFilter; this.metricFilter = metricFilter;
this.period = checkArg(period, period > 0, "period"); this.periodMs = checkArg(periodMs, periodMs > 0, "period");
firstRetryDelay = checkArg(retryDelay, retryDelay > 0, "retry delay"); firstRetryDelay = checkArg(retryDelay, retryDelay > 0, "retry delay");
this.retryBackoff = checkArg(retryBackoff, retryBackoff>1, "retry backoff"); this.retryBackoff = checkArg(retryBackoff, retryBackoff>1, "retry backoff");
oobPutTimeout = (long) oobPutTimeout = (long)
@ -93,9 +93,9 @@ class MetricsSinkAdapter implements SinkQueue.Consumer<MetricsBuffer> {
sinkThread.setDaemon(true); sinkThread.setDaemon(true);
} }
boolean putMetrics(MetricsBuffer buffer, long logicalTime) { boolean putMetrics(MetricsBuffer buffer, long logicalTimeMs) {
if (logicalTime % period == 0) { if (logicalTimeMs % periodMs == 0) {
LOG.debug("enqueue, logicalTime="+ logicalTime); LOG.debug("enqueue, logicalTime="+ logicalTimeMs);
if (queue.enqueue(buffer)) { if (queue.enqueue(buffer)) {
refreshQueueSizeGauge(); refreshQueueSizeGauge();
return true; return true;

View File

@ -519,7 +519,7 @@ public class MetricsSystemImpl extends MetricsSystem implements MetricsSource {
conf.getFilter(SOURCE_FILTER_KEY), conf.getFilter(SOURCE_FILTER_KEY),
conf.getFilter(RECORD_FILTER_KEY), conf.getFilter(RECORD_FILTER_KEY),
conf.getFilter(METRIC_FILTER_KEY), conf.getFilter(METRIC_FILTER_KEY),
conf.getInt(PERIOD_KEY, PERIOD_DEFAULT), conf.getInt(PERIOD_KEY, PERIOD_DEFAULT) * 1000,
conf.getInt(QUEUE_CAPACITY_KEY, QUEUE_CAPACITY_DEFAULT), conf.getInt(QUEUE_CAPACITY_KEY, QUEUE_CAPACITY_DEFAULT),
conf.getInt(RETRY_DELAY_KEY, RETRY_DELAY_DEFAULT), conf.getInt(RETRY_DELAY_KEY, RETRY_DELAY_DEFAULT),
conf.getFloat(RETRY_BACKOFF_KEY, RETRY_BACKOFF_DEFAULT), conf.getFloat(RETRY_BACKOFF_KEY, RETRY_BACKOFF_DEFAULT),
@ -618,6 +618,11 @@ public class MetricsSystemImpl extends MetricsSystem implements MetricsSource {
return sources.get(name); return sources.get(name);
} }
@VisibleForTesting
public MetricsSinkAdapter getSinkAdapter(String name) {
return sinks.get(name);
}
private InitMode initMode() { private InitMode initMode() {
LOG.debug("from system property: "+ System.getProperty(MS_INIT_MODE_KEY)); LOG.debug("from system property: "+ System.getProperty(MS_INIT_MODE_KEY));
LOG.debug("from environment variable: "+ System.getenv(MS_INIT_MODE_KEY)); LOG.debug("from environment variable: "+ System.getenv(MS_INIT_MODE_KEY));

View File

@ -39,10 +39,12 @@ import static org.junit.Assert.*;
import static org.mockito.Mockito.*; import static org.mockito.Mockito.*;
import com.google.common.base.Predicate; import com.google.common.base.Predicate;
import com.google.common.base.Supplier;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import org.apache.commons.configuration2.SubsetConfiguration; import org.apache.commons.configuration2.SubsetConfiguration;
import org.apache.hadoop.metrics2.MetricsException; import org.apache.hadoop.metrics2.MetricsException;
import org.apache.hadoop.test.GenericTestUtils;
import static org.apache.hadoop.test.MoreAsserts.*; import static org.apache.hadoop.test.MoreAsserts.*;
import org.apache.hadoop.metrics2.AbstractMetric; import org.apache.hadoop.metrics2.AbstractMetric;
@ -78,8 +80,11 @@ public class TestMetricsSystemImpl {
public static class TestSink implements MetricsSink { public static class TestSink implements MetricsSink {
private List<Iterable<AbstractMetric>> metricValues = new ArrayList<>();
@Override public void putMetrics(MetricsRecord record) { @Override public void putMetrics(MetricsRecord record) {
LOG.debug(record.toString()); LOG.debug(record.toString());
metricValues.add(record.metrics());
} }
@Override public void flush() {} @Override public void flush() {}
@ -87,6 +92,10 @@ public class TestMetricsSystemImpl {
@Override public void init(SubsetConfiguration conf) { @Override public void init(SubsetConfiguration conf) {
LOG.debug(MetricsConfig.toString(conf)); LOG.debug(MetricsConfig.toString(conf));
} }
List<Iterable<AbstractMetric>> getMetricValues() {
return metricValues;
}
} }
@Test public void testInitFirstVerifyStopInvokedImmediately() throws Exception { @Test public void testInitFirstVerifyStopInvokedImmediately() throws Exception {
@ -559,6 +568,46 @@ public class TestMetricsSystemImpl {
ms.shutdown(); ms.shutdown();
} }
@Test
public void testRegisterSinksMultiplePeriods() throws Exception {
new ConfigBuilder().add("test.sink.test1.period", 100000)
.add("test.sink.test1.class", TestSink.class.getName())
.add("test.sink.test2.period", 200000)
.add("test.sink.test2.class", TestSink.class.getName())
.save(TestMetricsConfig.getTestFilename("hadoop-metrics2-test"));
MetricsSystemImpl ms = new MetricsSystemImpl();
try {
ms.init("test");
TestSink sink1 = (TestSink) ms.getSinkAdapter("test1").sink();
TestSink sink2 = (TestSink) ms.getSinkAdapter("test2").sink();
assertEquals(0, sink1.getMetricValues().size());
assertEquals(0, sink2.getMetricValues().size());
ms.onTimerEvent();
// Give some time for the publish event to go through
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
return sink1.getMetricValues().size() > 0;
}
}, 10, 10000);
assertEquals(1, sink1.getMetricValues().size());
assertEquals(0, sink2.getMetricValues().size());
ms.onTimerEvent();
// Give some time for the publish event to go through
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
return sink1.getMetricValues().size() > 1 &&
sink2.getMetricValues().size() > 0;
}
}, 10, 10000);
assertEquals(2, sink1.getMetricValues().size());
assertEquals(1, sink2.getMetricValues().size());
} finally {
ms.shutdown();
}
}
@Metrics(context="test") @Metrics(context="test")
private static class TestSource { private static class TestSource {
@Metric("C1 desc") MutableCounterLong c1; @Metric("C1 desc") MutableCounterLong c1;