HBASE-16855 Avoid NPE in MetricsConnection’s construction (ChiaPing Tsai)
This commit is contained in:
parent
08498c6848
commit
6df7554d29
|
@ -22,6 +22,7 @@ import com.google.protobuf.Descriptors.MethodDescriptor;
|
|||
import com.google.protobuf.Message;
|
||||
import com.yammer.metrics.core.Counter;
|
||||
import com.yammer.metrics.core.Histogram;
|
||||
import com.yammer.metrics.core.MetricName;
|
||||
import com.yammer.metrics.core.MetricsRegistry;
|
||||
import com.yammer.metrics.core.Timer;
|
||||
import com.yammer.metrics.reporting.JmxReporter;
|
||||
|
@ -294,24 +295,38 @@ public class MetricsConnection implements StatisticTrackable {
|
|||
public MetricsConnection(final ConnectionManager.HConnectionImplementation conn) {
|
||||
this.scope = conn.toString();
|
||||
this.registry = new MetricsRegistry();
|
||||
final ThreadPoolExecutor batchPool = (ThreadPoolExecutor) conn.getCurrentBatchPool();
|
||||
final ThreadPoolExecutor metaPool = (ThreadPoolExecutor) conn.getCurrentMetaLookupPool();
|
||||
|
||||
this.registry.newGauge(this.getClass(), "executorPoolActiveThreads", scope,
|
||||
this.registry.newGauge(getExecutorPoolName(),
|
||||
new RatioGauge() {
|
||||
@Override protected double getNumerator() {
|
||||
ThreadPoolExecutor batchPool = (ThreadPoolExecutor) conn.getCurrentBatchPool();
|
||||
if (batchPool == null) {
|
||||
return 0;
|
||||
}
|
||||
return batchPool.getActiveCount();
|
||||
}
|
||||
@Override protected double getDenominator() {
|
||||
ThreadPoolExecutor batchPool = (ThreadPoolExecutor) conn.getCurrentBatchPool();
|
||||
if (batchPool == null) {
|
||||
return 0;
|
||||
}
|
||||
return batchPool.getMaximumPoolSize();
|
||||
}
|
||||
});
|
||||
this.registry.newGauge(this.getClass(), "metaPoolActiveThreads", scope,
|
||||
this.registry.newGauge(getMetaPoolName(),
|
||||
new RatioGauge() {
|
||||
@Override protected double getNumerator() {
|
||||
ThreadPoolExecutor metaPool = (ThreadPoolExecutor) conn.getCurrentMetaLookupPool();
|
||||
if (metaPool == null) {
|
||||
return 0;
|
||||
}
|
||||
return metaPool.getActiveCount();
|
||||
}
|
||||
@Override protected double getDenominator() {
|
||||
ThreadPoolExecutor metaPool = (ThreadPoolExecutor) conn.getCurrentMetaLookupPool();
|
||||
if (metaPool == null) {
|
||||
return 0;
|
||||
}
|
||||
return metaPool.getMaximumPoolSize();
|
||||
}
|
||||
});
|
||||
|
@ -334,6 +349,21 @@ public class MetricsConnection implements StatisticTrackable {
|
|||
this.reporter.start();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
final MetricName getExecutorPoolName() {
|
||||
return new MetricName(getClass(), "executorPoolActiveThreads", scope);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
final MetricName getMetaPoolName() {
|
||||
return new MetricName(getClass(), "metaPoolActiveThreads", scope);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
MetricsRegistry getMetricsRegistry() {
|
||||
return registry;
|
||||
}
|
||||
|
||||
public void shutdown() {
|
||||
this.reporter.shutdown();
|
||||
this.registry.shutdown();
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import com.google.protobuf.ByteString;
|
||||
import com.yammer.metrics.util.RatioGauge;
|
||||
import org.apache.hadoop.hbase.client.ConnectionManager.HConnectionImplementation;
|
||||
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
|
||||
|
@ -37,24 +38,43 @@ import org.junit.BeforeClass;
|
|||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
@Category({MetricsTests.class, SmallTests.class})
|
||||
public class TestMetricsConnection {
|
||||
|
||||
private static final ExecutorService BATCH_POOL = Executors.newFixedThreadPool(2);
|
||||
private static MetricsConnection METRICS;
|
||||
|
||||
private static final AtomicBoolean closed = new AtomicBoolean(false);
|
||||
private static final Runnable RUNNER = new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
while (!closed.get() && !Thread.interrupted()) {
|
||||
TimeUnit.MILLISECONDS.sleep(10);
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
}
|
||||
}
|
||||
};
|
||||
@BeforeClass
|
||||
public static void beforeClass() {
|
||||
HConnectionImplementation mocked = Mockito.mock(HConnectionImplementation.class);
|
||||
Mockito.when(mocked.toString()).thenReturn("mocked-connection");
|
||||
METRICS = new MetricsConnection(Mockito.mock(HConnectionImplementation.class));
|
||||
Mockito.when(mocked.getCurrentBatchPool()).thenReturn(BATCH_POOL);
|
||||
BATCH_POOL.submit(RUNNER);
|
||||
METRICS = new MetricsConnection(mocked);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void afterClass() {
|
||||
public static void afterClass() throws InterruptedException {
|
||||
METRICS.shutdown();
|
||||
BATCH_POOL.shutdownNow();
|
||||
BATCH_POOL.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -116,5 +136,11 @@ public class TestMetricsConnection {
|
|||
Assert.assertEquals("Failed to invoke reqHist on " + t, loop, t.reqHist.count());
|
||||
Assert.assertEquals("Failed to invoke respHist on " + t, loop, t.respHist.count());
|
||||
}
|
||||
RatioGauge executorMetrics = (RatioGauge) METRICS.getMetricsRegistry()
|
||||
.allMetrics().get(METRICS.getExecutorPoolName());
|
||||
RatioGauge metaMetrics = (RatioGauge) METRICS.getMetricsRegistry()
|
||||
.allMetrics().get(METRICS.getMetaPoolName());
|
||||
assertEquals((double) 0.5, executorMetrics.value(), 0);
|
||||
assertEquals(Double.NaN, metaMetrics.value(), 0);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue