HADOOP-11932. MetricsSinkAdapter may hang when being stopped. Contributed by Brahma Reddy Battula
This commit is contained in:
parent
3da0bedaec
commit
f59612edd7
|
@ -1079,6 +1079,9 @@ Release 2.7.2 - UNRELEASED
|
|||
HADOOP-12304. Applications using FileContext fail with the default file
|
||||
system configured to be wasb/s3/etc. (cnauroth)
|
||||
|
||||
HADOOP-11932. MetricsSinkAdapter may hang when being stopped.
|
||||
(Brahma Reddy Battula via jianhe)
|
||||
|
||||
Release 2.7.1 - 2015-07-06
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -206,14 +206,14 @@ class MetricsSinkAdapter implements SinkQueue.Consumer<MetricsBuffer> {
|
|||
void stop() {
|
||||
stopping = true;
|
||||
sinkThread.interrupt();
|
||||
if (sink instanceof Closeable) {
|
||||
IOUtils.cleanup(LOG, (Closeable)sink);
|
||||
}
|
||||
try {
|
||||
sinkThread.join();
|
||||
} catch (InterruptedException e) {
|
||||
LOG.warn("Stop interrupted", e);
|
||||
}
|
||||
if (sink instanceof Closeable) {
|
||||
IOUtils.cleanup(LOG, (Closeable)sink);
|
||||
}
|
||||
}
|
||||
|
||||
String name() {
|
||||
|
|
|
@ -18,6 +18,8 @@
|
|||
|
||||
package org.apache.hadoop.metrics2.impl;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.*;
|
||||
import java.util.concurrent.atomic.*;
|
||||
|
@ -484,6 +486,64 @@ public class TestMetricsSystemImpl {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Class to verify HADOOP-11932. Instead of reading from HTTP, going in loop
|
||||
* until closed.
|
||||
*/
|
||||
private static class TestClosableSink implements MetricsSink, Closeable {
|
||||
|
||||
boolean closed = false;
|
||||
CountDownLatch collectingLatch;
|
||||
|
||||
public TestClosableSink(CountDownLatch collectingLatch) {
|
||||
this.collectingLatch = collectingLatch;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init(SubsetConfiguration conf) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
closed = true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void putMetrics(MetricsRecord record) {
|
||||
while (!closed) {
|
||||
collectingLatch.countDown();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void flush() {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* HADOOP-11932
|
||||
*/
|
||||
@Test(timeout = 5000)
|
||||
public void testHangOnSinkRead() throws Exception {
|
||||
new ConfigBuilder().add("*.period", 8)
|
||||
.add("test.sink.test.class", TestSink.class.getName())
|
||||
.save(TestMetricsConfig.getTestFilename("hadoop-metrics2-test"));
|
||||
MetricsSystemImpl ms = new MetricsSystemImpl("Test");
|
||||
ms.start();
|
||||
try {
|
||||
CountDownLatch collectingLatch = new CountDownLatch(1);
|
||||
MetricsSink sink = new TestClosableSink(collectingLatch);
|
||||
ms.registerSink("closeableSink",
|
||||
"The sink will be used to test closeability", sink);
|
||||
// trigger metric collection first time
|
||||
ms.onTimerEvent();
|
||||
// Make sure that sink is collecting metrics
|
||||
assertTrue(collectingLatch.await(1, TimeUnit.SECONDS));
|
||||
} finally {
|
||||
ms.stop();
|
||||
}
|
||||
}
|
||||
|
||||
@Metrics(context="test")
|
||||
private static class TestSource {
|
||||
@Metric("C1 desc") MutableCounterLong c1;
|
||||
|
|
Loading…
Reference in New Issue