HADOOP-9087. Queue size metric for metric sinks isn't actually maintained. Contributed by Akira AJISAKA

(cherry picked from commit f0f2992686)
This commit is contained in:
Jason Lowe 2015-02-19 17:38:39 +00:00
parent fee29e4a4f
commit b1fc4ec57a
4 changed files with 67 additions and 6 deletions

View File

@ -565,6 +565,9 @@ Release 2.7.0 - UNRELEASED
HADOOP-11595. Add default implementation for AbstractFileSystem#truncate. HADOOP-11595. Add default implementation for AbstractFileSystem#truncate.
(yliu) (yliu)
HADOOP-9087. Queue size metric for metric sinks isn't actually maintained
(Akira AJISAKA via jlowe)
Release 2.6.1 - UNRELEASED Release 2.6.1 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -95,7 +95,10 @@ class MetricsSinkAdapter implements SinkQueue.Consumer<MetricsBuffer> {
boolean putMetrics(MetricsBuffer buffer, long logicalTime) { boolean putMetrics(MetricsBuffer buffer, long logicalTime) {
if (logicalTime % period == 0) { if (logicalTime % period == 0) {
LOG.debug("enqueue, logicalTime="+ logicalTime); LOG.debug("enqueue, logicalTime="+ logicalTime);
if (queue.enqueue(buffer)) return true; if (queue.enqueue(buffer)) {
refreshQueueSizeGauge();
return true;
}
dropped.incr(); dropped.incr();
return false; return false;
} }
@ -105,7 +108,9 @@ class MetricsSinkAdapter implements SinkQueue.Consumer<MetricsBuffer> {
public boolean putMetricsImmediate(MetricsBuffer buffer) { public boolean putMetricsImmediate(MetricsBuffer buffer) {
WaitableMetricsBuffer waitableBuffer = WaitableMetricsBuffer waitableBuffer =
new WaitableMetricsBuffer(buffer); new WaitableMetricsBuffer(buffer);
if (!queue.enqueue(waitableBuffer)) { if (queue.enqueue(waitableBuffer)) {
refreshQueueSizeGauge();
} else {
LOG.warn(name + " has a full queue and can't consume the given metrics."); LOG.warn(name + " has a full queue and can't consume the given metrics.");
dropped.incr(); dropped.incr();
return false; return false;
@ -127,6 +132,7 @@ class MetricsSinkAdapter implements SinkQueue.Consumer<MetricsBuffer> {
while (!stopping) { while (!stopping) {
try { try {
queue.consumeAll(this); queue.consumeAll(this);
refreshQueueSizeGauge();
retryDelay = firstRetryDelay; retryDelay = firstRetryDelay;
n = retryCount; n = retryCount;
inError = false; inError = false;
@ -154,12 +160,17 @@ class MetricsSinkAdapter implements SinkQueue.Consumer<MetricsBuffer> {
"suppressing further error messages", e); "suppressing further error messages", e);
} }
queue.clear(); queue.clear();
refreshQueueSizeGauge();
inError = true; // Don't keep complaining ad infinitum inError = true; // Don't keep complaining ad infinitum
} }
} }
} }
} }
private void refreshQueueSizeGauge() {
qsize.set(queue.size());
}
@Override @Override
public void consume(MetricsBuffer buffer) { public void consume(MetricsBuffer buffer) {
long ts = 0; long ts = 0;

View File

@ -843,10 +843,7 @@ metricssystem context
|<<<Sink_>>><instance><<<Dropped>>> | Total number of dropped sink operations |<<<Sink_>>><instance><<<Dropped>>> | Total number of dropped sink operations
| for the <instance> | for the <instance>
*-------------------------------------+--------------------------------------+ *-------------------------------------+--------------------------------------+
|<<<Sink_>>><instance><<<Qsize>>> | Current queue length of sink operations \ |<<<Sink_>>><instance><<<Qsize>>> | Current queue length of the sink
| (BUT always set to 0 because nothing to
| increment this metrics, see
| {{{https://issues.apache.org/jira/browse/HADOOP-9941}HADOOP-9941}})
*-------------------------------------+--------------------------------------+ *-------------------------------------+--------------------------------------+
default context default context

View File

@ -29,7 +29,9 @@ import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor; import org.mockito.ArgumentCaptor;
import org.mockito.Captor; import org.mockito.Captor;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.runners.MockitoJUnitRunner; import org.mockito.runners.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
import static org.junit.Assert.*; import static org.junit.Assert.*;
import static org.mockito.Mockito.*; import static org.mockito.Mockito.*;
@ -434,6 +436,54 @@ public class TestMetricsSystemImpl {
new MetricGaugeInt(MsInfo.NumActiveSinks, 3))); new MetricGaugeInt(MsInfo.NumActiveSinks, 3)));
} }
@Test
public void testQSize() throws Exception {
new ConfigBuilder().add("*.period", 8)
.add("test.sink.test.class", TestSink.class.getName())
.save(TestMetricsConfig.getTestFilename("hadoop-metrics2-test"));
MetricsSystemImpl ms = new MetricsSystemImpl("Test");
final CountDownLatch proceedSignal = new CountDownLatch(1);
final CountDownLatch reachedPutMetricSignal = new CountDownLatch(1);
ms.start();
try {
MetricsSink slowSink = mock(MetricsSink.class);
MetricsSink dataSink = mock(MetricsSink.class);
ms.registerSink("slowSink",
"The sink that will wait on putMetric", slowSink);
ms.registerSink("dataSink",
"The sink I'll use to get info about slowSink", dataSink);
doAnswer(new Answer() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
reachedPutMetricSignal.countDown();
proceedSignal.await();
return null;
}
}).when(slowSink).putMetrics(any(MetricsRecord.class));
// trigger metric collection first time
ms.onTimerEvent();
assertTrue(reachedPutMetricSignal.await(1, TimeUnit.SECONDS));
// Now that the slow sink is still processing the first metric,
// its queue length should be 1 for the second collection.
ms.onTimerEvent();
verify(dataSink, timeout(500).times(2)).putMetrics(r1.capture());
List<MetricsRecord> mr = r1.getAllValues();
Number qSize = Iterables.find(mr.get(1).metrics(),
new Predicate<AbstractMetric>() {
@Override
public boolean apply(@Nullable AbstractMetric input) {
assert input != null;
return input.name().equals("Sink_slowSinkQsize");
}
}).value();
assertEquals(1, qSize);
} finally {
proceedSignal.countDown();
ms.stop();
}
}
@Metrics(context="test") @Metrics(context="test")
private static class TestSource { private static class TestSource {
@Metric("C1 desc") MutableCounterLong c1; @Metric("C1 desc") MutableCounterLong c1;