diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index b93a801af8b..3f41cbcf0bd 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -565,6 +565,9 @@ Release 2.7.0 - UNRELEASED HADOOP-11595. Add default implementation for AbstractFileSystem#truncate. (yliu) + HADOOP-9087. Queue size metric for metric sinks isn't actually maintained + (Akira AJISAKA via jlowe) + Release 2.6.1 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSinkAdapter.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSinkAdapter.java index de39a13e196..478c3169e7e 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSinkAdapter.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSinkAdapter.java @@ -95,7 +95,10 @@ class MetricsSinkAdapter implements SinkQueue.Consumer { boolean putMetrics(MetricsBuffer buffer, long logicalTime) { if (logicalTime % period == 0) { LOG.debug("enqueue, logicalTime="+ logicalTime); - if (queue.enqueue(buffer)) return true; + if (queue.enqueue(buffer)) { + refreshQueueSizeGauge(); + return true; + } dropped.incr(); return false; } @@ -105,7 +108,9 @@ class MetricsSinkAdapter implements SinkQueue.Consumer { public boolean putMetricsImmediate(MetricsBuffer buffer) { WaitableMetricsBuffer waitableBuffer = new WaitableMetricsBuffer(buffer); - if (!queue.enqueue(waitableBuffer)) { + if (queue.enqueue(waitableBuffer)) { + refreshQueueSizeGauge(); + } else { LOG.warn(name + " has a full queue and can't consume the given metrics."); dropped.incr(); return false; @@ -127,6 +132,7 @@ class MetricsSinkAdapter implements SinkQueue.Consumer { while (!stopping) { try { queue.consumeAll(this); + refreshQueueSizeGauge(); retryDelay = firstRetryDelay; n = retryCount; inError = false; @@ -154,12 +160,17 @@ class MetricsSinkAdapter implements SinkQueue.Consumer { "suppressing further error messages", e); } queue.clear(); + refreshQueueSizeGauge(); inError = true; // Don't keep complaining ad infinitum } } } } + private void refreshQueueSizeGauge() { + qsize.set(queue.size()); + } + @Override public void consume(MetricsBuffer buffer) { long ts = 0; diff --git a/hadoop-common-project/hadoop-common/src/site/apt/Metrics.apt.vm b/hadoop-common-project/hadoop-common/src/site/apt/Metrics.apt.vm index 02ff28bc644..915467e5ca1 100644 --- a/hadoop-common-project/hadoop-common/src/site/apt/Metrics.apt.vm +++ b/hadoop-common-project/hadoop-common/src/site/apt/Metrics.apt.vm @@ -843,10 +843,7 @@ metricssystem context |<<>><<>> | Total number of dropped sink operations | for the *-------------------------------------+--------------------------------------+ -|<<>><<>> | Current queue length of sink operations \ - | (BUT always set to 0 because nothing to - | increment this metrics, see - | {{{https://issues.apache.org/jira/browse/HADOOP-9941}HADOOP-9941}}) +|<<>><<>> | Current queue length of the sink *-------------------------------------+--------------------------------------+ default context diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestMetricsSystemImpl.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestMetricsSystemImpl.java index 4c2ebc8e0e8..0f7b15f2ef9 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestMetricsSystemImpl.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestMetricsSystemImpl.java @@ -29,7 +29,9 @@ import org.junit.runner.RunWith; import org.mockito.ArgumentCaptor; import org.mockito.Captor; +import org.mockito.invocation.InvocationOnMock; import org.mockito.runners.MockitoJUnitRunner; +import org.mockito.stubbing.Answer; import static org.junit.Assert.*; import static org.mockito.Mockito.*; @@ -434,6 +436,54 @@ public class TestMetricsSystemImpl { new MetricGaugeInt(MsInfo.NumActiveSinks, 3))); } + @Test + public void testQSize() throws Exception { + new ConfigBuilder().add("*.period", 8) + .add("test.sink.test.class", TestSink.class.getName()) + .save(TestMetricsConfig.getTestFilename("hadoop-metrics2-test")); + MetricsSystemImpl ms = new MetricsSystemImpl("Test"); + final CountDownLatch proceedSignal = new CountDownLatch(1); + final CountDownLatch reachedPutMetricSignal = new CountDownLatch(1); + ms.start(); + try { + MetricsSink slowSink = mock(MetricsSink.class); + MetricsSink dataSink = mock(MetricsSink.class); + ms.registerSink("slowSink", + "The sink that will wait on putMetric", slowSink); + ms.registerSink("dataSink", + "The sink I'll use to get info about slowSink", dataSink); + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + reachedPutMetricSignal.countDown(); + proceedSignal.await(); + return null; + } + }).when(slowSink).putMetrics(any(MetricsRecord.class)); + + // trigger metric collection first time + ms.onTimerEvent(); + assertTrue(reachedPutMetricSignal.await(1, TimeUnit.SECONDS)); + // Now that the slow sink is still processing the first metric, + // its queue length should be 1 for the second collection. + ms.onTimerEvent(); + verify(dataSink, timeout(500).times(2)).putMetrics(r1.capture()); + List mr = r1.getAllValues(); + Number qSize = Iterables.find(mr.get(1).metrics(), + new Predicate() { + @Override + public boolean apply(@Nullable AbstractMetric input) { + assert input != null; + return input.name().equals("Sink_slowSinkQsize"); + } + }).value(); + assertEquals(1, qSize); + } finally { + proceedSignal.countDown(); + ms.stop(); + } + } + @Metrics(context="test") private static class TestSource { @Metric("C1 desc") MutableCounterLong c1;