HADOOP-11932. MetricsSinkAdapter may hang when being stopped. Contributed by Brahma Reddy Battula

(cherry picked from commit f59612edd7)
This commit is contained in:
Jian He 2015-08-05 16:12:45 -07:00
parent b83d475252
commit 5950c1f6f8
3 changed files with 66 additions and 3 deletions

View File

@ -581,6 +581,9 @@ Release 2.7.2 - UNRELEASED
HADOOP-12304. Applications using FileContext fail with the default file HADOOP-12304. Applications using FileContext fail with the default file
system configured to be wasb/s3/etc. (cnauroth) system configured to be wasb/s3/etc. (cnauroth)
HADOOP-11932. MetricsSinkAdapter may hang when being stopped.
(Brahma Reddy Battula via jianhe)
Release 2.7.1 - 2015-07-06 Release 2.7.1 - 2015-07-06
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -209,15 +209,15 @@ class MetricsSinkAdapter implements SinkQueue.Consumer<MetricsBuffer> {
void stop() { void stop() {
stopping = true; stopping = true;
sinkThread.interrupt(); sinkThread.interrupt();
if (sink instanceof Closeable) {
IOUtils.cleanup(LOG, (Closeable)sink);
}
try { try {
sinkThread.join(); sinkThread.join();
} }
catch (InterruptedException e) { catch (InterruptedException e) {
LOG.warn("Stop interrupted", e); LOG.warn("Stop interrupted", e);
} }
if (sink instanceof Closeable) {
IOUtils.cleanup(LOG, (Closeable)sink);
}
} }
String name() { String name() {

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.metrics2.impl; package org.apache.hadoop.metrics2.impl;
import java.io.Closeable;
import java.io.IOException;
import java.util.*; import java.util.*;
import java.util.concurrent.*; import java.util.concurrent.*;
import java.util.concurrent.atomic.*; import java.util.concurrent.atomic.*;
@ -484,6 +486,64 @@ public class TestMetricsSystemImpl {
} }
} }
/**
* Class to verify HADOOP-11932. Instead of reading from HTTP, going in loop
* until closed.
*/
private static class TestClosableSink implements MetricsSink, Closeable {
boolean closed = false;
CountDownLatch collectingLatch;
public TestClosableSink(CountDownLatch collectingLatch) {
this.collectingLatch = collectingLatch;
}
@Override
public void init(SubsetConfiguration conf) {
}
@Override
public void close() throws IOException {
closed = true;
}
@Override
public void putMetrics(MetricsRecord record) {
while (!closed) {
collectingLatch.countDown();
}
}
@Override
public void flush() {
}
}
/**
* HADOOP-11932
*/
@Test(timeout = 5000)
public void testHangOnSinkRead() throws Exception {
new ConfigBuilder().add("*.period", 8)
.add("test.sink.test.class", TestSink.class.getName())
.save(TestMetricsConfig.getTestFilename("hadoop-metrics2-test"));
MetricsSystemImpl ms = new MetricsSystemImpl("Test");
ms.start();
try {
CountDownLatch collectingLatch = new CountDownLatch(1);
MetricsSink sink = new TestClosableSink(collectingLatch);
ms.registerSink("closeableSink",
"The sink will be used to test closeability", sink);
// trigger metric collection first time
ms.onTimerEvent();
// Make sure that sink is collecting metrics
assertTrue(collectingLatch.await(1, TimeUnit.SECONDS));
} finally {
ms.stop();
}
}
@Metrics(context="test") @Metrics(context="test")
private static class TestSource { private static class TestSource {
@Metric("C1 desc") MutableCounterLong c1; @Metric("C1 desc") MutableCounterLong c1;