HBASE-27466: Making metrics instance containing one or more connections. (#4874)

Signed-off-by: David Manning <67607031+d-c-manning@users.noreply.github.com>
Signed-off-by: Viraj Jasani <vjasani@apache.org>
This commit is contained in:
Victor 2022-12-01 18:02:00 -08:00 committed by GitHub
parent 8a35f0ad6a
commit 320eca20fb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 278 additions and 61 deletions

View File

@ -118,6 +118,7 @@ public class AsyncConnectionImpl implements AsyncConnection {
private final AtomicBoolean closed = new AtomicBoolean(false);
private final String metricsScope;
private final Optional<MetricsConnection> metrics;
private final ClusterStatusListener clusterStatusListener;
@ -128,6 +129,7 @@ public class AsyncConnectionImpl implements AsyncConnection {
SocketAddress localAddress, User user) {
this.conf = conf;
this.user = user;
this.metricsScope = MetricsConnection.getScope(conf, clusterId, this);
if (user.isLoginFromKeytab()) {
spawnRenewalChore(user.getUGI());
@ -135,8 +137,8 @@ public class AsyncConnectionImpl implements AsyncConnection {
this.connConf = new AsyncConnectionConfiguration(conf);
this.registry = registry;
if (conf.getBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, false)) {
String scope = MetricsConnection.getScope(conf, clusterId, this);
this.metrics = Optional.of(new MetricsConnection(scope, () -> null, () -> null));
this.metrics =
Optional.of(MetricsConnection.getMetricsConnection(metricsScope, () -> null, () -> null));
} else {
this.metrics = Optional.empty();
}
@ -235,7 +237,9 @@ public class AsyncConnectionImpl implements AsyncConnection {
choreService = null;
}
}
metrics.ifPresent(MetricsConnection::shutdown);
if (metrics.isPresent()) {
MetricsConnection.deleteMetricsConnection(metricsScope);
}
ConnectionOverAsyncConnection c = this.conn;
if (c != null) {
c.closePool();

View File

@ -26,6 +26,8 @@ import com.codahale.metrics.JmxReporter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.RatioGauge;
import com.codahale.metrics.Timer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
@ -47,12 +49,43 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationPr
/**
* This class is for maintaining the various connection statistics and publishing them through the
* metrics interfaces. This class manages its own {@link MetricRegistry} and {@link JmxReporter} so
* as to not conflict with other uses of Yammer Metrics within the client application. Instantiating
* this class implicitly creates and "starts" instances of these classes; be sure to call
* {@link #shutdown()} to terminate the thread pools they allocate.
* as to not conflict with other uses of Yammer Metrics within the client application. Calling
* {@link #getMetricsConnection(String, Supplier, Supplier)} implicitly creates and "starts"
* instances of these classes; be sure to call {@link #deleteMetricsConnection(String)} to terminate
* the thread pools they allocate. The metrics reporter will be shutdown {@link #shutdown()} when
* all connections within this metrics instances are closed.
*/
@InterfaceAudience.Private
public class MetricsConnection implements StatisticTrackable {
public final class MetricsConnection implements StatisticTrackable {
private static final ConcurrentMap<String, MetricsConnection> METRICS_INSTANCES =
new ConcurrentHashMap<>();
static MetricsConnection getMetricsConnection(final String scope,
Supplier<ThreadPoolExecutor> batchPool, Supplier<ThreadPoolExecutor> metaPool) {
return METRICS_INSTANCES.compute(scope, (s, metricsConnection) -> {
if (metricsConnection == null) {
MetricsConnection newMetricsConn = new MetricsConnection(scope, batchPool, metaPool);
newMetricsConn.incrConnectionCount();
return newMetricsConn;
} else {
metricsConnection.addThreadPools(batchPool, metaPool);
metricsConnection.incrConnectionCount();
return metricsConnection;
}
});
}
static void deleteMetricsConnection(final String scope) {
METRICS_INSTANCES.computeIfPresent(scope, (s, metricsConnection) -> {
metricsConnection.decrConnectionCount();
if (metricsConnection.getConnectionCount() == 0) {
metricsConnection.shutdown();
return null;
}
return metricsConnection;
});
}
/** Set this key to {@code true} to enable metrics collection of client requests. */
public static final String CLIENT_SIDE_METRICS_ENABLED_KEY = "hbase.client.metrics.enable";
@ -231,7 +264,7 @@ public class MetricsConnection implements StatisticTrackable {
}
}
protected ConcurrentHashMap<ServerName, ConcurrentMap<byte[], RegionStats>> serverStats =
private ConcurrentHashMap<ServerName, ConcurrentMap<byte[], RegionStats>> serverStats =
new ConcurrentHashMap<>();
public void updateServerStats(ServerName serverName, byte[] regionName, Object r) {
@ -272,7 +305,7 @@ public class MetricsConnection implements StatisticTrackable {
private final MetricRegistry registry;
private final JmxReporter reporter;
protected final String scope;
private final String scope;
private final NewMetric<Timer> timerFactory = new NewMetric<Timer>() {
@Override
@ -295,66 +328,93 @@ public class MetricsConnection implements StatisticTrackable {
}
};
// List of thread pool per connection of the metrics.
private final List<Supplier<ThreadPoolExecutor>> batchPools = new ArrayList<>();
private final List<Supplier<ThreadPoolExecutor>> metaPools = new ArrayList<>();
// static metrics
protected final Counter metaCacheHits;
protected final Counter metaCacheMisses;
protected final CallTracker getTracker;
protected final CallTracker scanTracker;
protected final CallTracker appendTracker;
protected final CallTracker deleteTracker;
protected final CallTracker incrementTracker;
protected final CallTracker putTracker;
protected final CallTracker multiTracker;
protected final RunnerStats runnerStats;
protected final Counter metaCacheNumClearServer;
protected final Counter metaCacheNumClearRegion;
protected final Counter hedgedReadOps;
protected final Counter hedgedReadWin;
protected final Histogram concurrentCallsPerServerHist;
protected final Histogram numActionsPerServerHist;
protected final Counter nsLookups;
protected final Counter nsLookupsFailed;
protected final Timer overloadedBackoffTimer;
private final Counter connectionCount;
private final Counter metaCacheHits;
private final Counter metaCacheMisses;
private final CallTracker getTracker;
private final CallTracker scanTracker;
private final CallTracker appendTracker;
private final CallTracker deleteTracker;
private final CallTracker incrementTracker;
private final CallTracker putTracker;
private final CallTracker multiTracker;
private final RunnerStats runnerStats;
private final Counter metaCacheNumClearServer;
private final Counter metaCacheNumClearRegion;
private final Counter hedgedReadOps;
private final Counter hedgedReadWin;
private final Histogram concurrentCallsPerServerHist;
private final Histogram numActionsPerServerHist;
private final Counter nsLookups;
private final Counter nsLookupsFailed;
private final Timer overloadedBackoffTimer;
// dynamic metrics
// These maps are used to cache references to the metric instances that are managed by the
// registry. I don't think their use perfectly removes redundant allocations, but it's
// a big improvement over calling registry.newMetric each time.
protected final ConcurrentMap<String, Timer> rpcTimers =
private final ConcurrentMap<String, Timer> rpcTimers =
new ConcurrentHashMap<>(CAPACITY, LOAD_FACTOR, CONCURRENCY_LEVEL);
protected final ConcurrentMap<String, Histogram> rpcHistograms = new ConcurrentHashMap<>(
private final ConcurrentMap<String, Histogram> rpcHistograms = new ConcurrentHashMap<>(
CAPACITY * 2 /* tracking both request and response sizes */, LOAD_FACTOR, CONCURRENCY_LEVEL);
private final ConcurrentMap<String, Counter> cacheDroppingExceptions =
new ConcurrentHashMap<>(CAPACITY, LOAD_FACTOR, CONCURRENCY_LEVEL);
protected final ConcurrentMap<String, Counter> rpcCounters =
private final ConcurrentMap<String, Counter> rpcCounters =
new ConcurrentHashMap<>(CAPACITY, LOAD_FACTOR, CONCURRENCY_LEVEL);
MetricsConnection(String scope, Supplier<ThreadPoolExecutor> batchPool,
private MetricsConnection(String scope, Supplier<ThreadPoolExecutor> batchPool,
Supplier<ThreadPoolExecutor> metaPool) {
this.scope = scope;
addThreadPools(batchPool, metaPool);
this.registry = new MetricRegistry();
this.registry.register(getExecutorPoolName(), new RatioGauge() {
@Override
protected Ratio getRatio() {
ThreadPoolExecutor pool = batchPool.get();
if (pool == null) {
return Ratio.of(0, 0);
int numerator = 0;
int denominator = 0;
for (Supplier<ThreadPoolExecutor> poolSupplier : batchPools) {
ThreadPoolExecutor pool = poolSupplier.get();
if (pool != null) {
int activeCount = pool.getActiveCount();
int maxPoolSize = pool.getMaximumPoolSize();
/* The max thread usage ratio among batch pools of all connections */
if (numerator == 0 || (numerator * maxPoolSize) < (activeCount * denominator)) {
numerator = activeCount;
denominator = maxPoolSize;
}
return Ratio.of(pool.getActiveCount(), pool.getMaximumPoolSize());
}
}
return Ratio.of(numerator, denominator);
}
});
this.registry.register(getMetaPoolName(), new RatioGauge() {
@Override
protected Ratio getRatio() {
ThreadPoolExecutor pool = metaPool.get();
if (pool == null) {
return Ratio.of(0, 0);
int numerator = 0;
int denominator = 0;
for (Supplier<ThreadPoolExecutor> poolSupplier : metaPools) {
ThreadPoolExecutor pool = poolSupplier.get();
if (pool != null) {
int activeCount = pool.getActiveCount();
int maxPoolSize = pool.getMaximumPoolSize();
/* The max thread usage ratio among meta lookup pools of all connections */
if (numerator == 0 || (numerator * maxPoolSize) < (activeCount * denominator)) {
numerator = activeCount;
denominator = maxPoolSize;
}
return Ratio.of(pool.getActiveCount(), pool.getMaximumPoolSize());
}
}
return Ratio.of(numerator, denominator);
}
});
this.connectionCount = registry.counter(name(this.getClass(), "connectionCount", scope));
this.metaCacheHits = registry.counter(name(this.getClass(), "metaCacheHits", scope));
this.metaCacheMisses = registry.counter(name(this.getClass(), "metaCacheMisses", scope));
this.metaCacheNumClearServer =
@ -396,8 +456,84 @@ public class MetricsConnection implements StatisticTrackable {
return registry;
}
public void shutdown() {
this.reporter.stop();
/** scope of the metrics object */
public String getMetricScope() {
return scope;
}
/** serverStats metric */
public ConcurrentHashMap<ServerName, ConcurrentMap<byte[], RegionStats>> getServerStats() {
return serverStats;
}
/** runnerStats metric */
public RunnerStats getRunnerStats() {
return runnerStats;
}
/** metaCacheNumClearServer metric */
public Counter getMetaCacheNumClearServer() {
return metaCacheNumClearServer;
}
/** metaCacheNumClearRegion metric */
public Counter getMetaCacheNumClearRegion() {
return metaCacheNumClearRegion;
}
/** hedgedReadOps metric */
public Counter getHedgedReadOps() {
return hedgedReadOps;
}
/** hedgedReadWin metric */
public Counter getHedgedReadWin() {
return hedgedReadWin;
}
/** numActionsPerServerHist metric */
public Histogram getNumActionsPerServerHist() {
return numActionsPerServerHist;
}
/** rpcCounters metric */
public ConcurrentMap<String, Counter> getRpcCounters() {
return rpcCounters;
}
/** getTracker metric */
public CallTracker getGetTracker() {
return getTracker;
}
/** scanTracker metric */
public CallTracker getScanTracker() {
return scanTracker;
}
/** multiTracker metric */
public CallTracker getMultiTracker() {
return multiTracker;
}
/** appendTracker metric */
public CallTracker getAppendTracker() {
return appendTracker;
}
/** deleteTracker metric */
public CallTracker getDeleteTracker() {
return deleteTracker;
}
/** incrementTracker metric */
public CallTracker getIncrementTracker() {
return incrementTracker;
}
/** putTracker metric */
public CallTracker getPutTracker() {
return putTracker;
}
/** Produce an instance of {@link CallStats} for clients to attach to RPCs. */
@ -457,6 +593,28 @@ public class MetricsConnection implements StatisticTrackable {
overloadedBackoffTimer.update(time, timeUnit);
}
/** Return the connection count of the metrics within a scope */
public long getConnectionCount() {
return connectionCount.getCount();
}
/** Increment the connection count of the metrics within a scope */
private void incrConnectionCount() {
connectionCount.inc();
}
/** Decrement the connection count of the metrics within a scope */
private void decrConnectionCount() {
connectionCount.dec();
}
/** Add thread pools of additional connections to the metrics */
private void addThreadPools(Supplier<ThreadPoolExecutor> batchPool,
Supplier<ThreadPoolExecutor> metaPool) {
batchPools.add(batchPool);
metaPools.add(metaPool);
}
/**
* Get a metric for {@code key} from {@code map}, or create it with {@code factory}.
*/
@ -474,6 +632,10 @@ public class MetricsConnection implements StatisticTrackable {
.update(stats.getResponseSizeBytes());
}
private void shutdown() {
this.reporter.stop();
}
/** Report RPC context to metrics system. */
public void updateRpc(MethodDescriptor method, Message param, CallStats stats) {
int callsPerServer = stats.getConcurrentCallsPerServer();

View File

@ -23,6 +23,8 @@ import static org.junit.Assert.assertTrue;
import com.codahale.metrics.RatioGauge;
import com.codahale.metrics.RatioGauge.Ratio;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
@ -61,14 +63,16 @@ public class TestMetricsConnection {
private static final ThreadPoolExecutor BATCH_POOL =
(ThreadPoolExecutor) Executors.newFixedThreadPool(2);
private static final String MOCK_CONN_STR = "mocked-connection";
@BeforeClass
public static void beforeClass() {
METRICS = new MetricsConnection("mocked-connection", () -> BATCH_POOL, () -> null);
METRICS = MetricsConnection.getMetricsConnection(MOCK_CONN_STR, () -> BATCH_POOL, () -> null);
}
@AfterClass
public static void afterClass() {
METRICS.shutdown();
MetricsConnection.deleteMetricsConnection(MOCK_CONN_STR);
}
@Test
@ -81,13 +85,59 @@ public class TestMetricsConnection {
AsyncConnectionImpl impl = new AsyncConnectionImpl(conf, null, "foo", null, User.getCurrent());
Optional<MetricsConnection> metrics = impl.getConnectionMetrics();
assertTrue("Metrics should be present", metrics.isPresent());
assertEquals(clusterId + "@" + Integer.toHexString(impl.hashCode()), metrics.get().scope);
assertEquals(clusterId + "@" + Integer.toHexString(impl.hashCode()),
metrics.get().getMetricScope());
conf.set(MetricsConnection.METRICS_SCOPE_KEY, scope);
impl = new AsyncConnectionImpl(conf, null, "foo", null, User.getCurrent());
metrics = impl.getConnectionMetrics();
assertTrue("Metrics should be present", metrics.isPresent());
assertEquals(scope, metrics.get().scope);
assertEquals(scope, metrics.get().getMetricScope());
}
@Test
public void testMetricsWithMutiConnections() throws IOException {
Configuration conf = new Configuration();
conf.setBoolean(MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY, true);
conf.set(MetricsConnection.METRICS_SCOPE_KEY, "unit-test");
User user = User.getCurrent();
/* create multiple connections */
final int num = 3;
AsyncConnectionImpl impl;
List<AsyncConnectionImpl> connList = new ArrayList<AsyncConnectionImpl>();
for (int i = 0; i < num; i++) {
impl = new AsyncConnectionImpl(conf, null, null, null, user);
connList.add(impl);
}
/* verify metrics presence */
impl = connList.get(0);
Optional<MetricsConnection> metrics = impl.getConnectionMetrics();
assertTrue("Metrics should be present", metrics.isPresent());
/* verify connection count in a shared metrics */
long count = metrics.get().getConnectionCount();
assertEquals("Failed to verify connection count." + count, count, num);
/* close some connections */
for (int i = 0; i < num - 1; i++) {
connList.get(i).close();
}
/* verify metrics presence again */
impl = connList.get(num - 1);
metrics = impl.getConnectionMetrics();
assertTrue("Metrics should be present after some of connections are closed.",
metrics.isPresent());
/* verify count of remaining connections */
count = metrics.get().getConnectionCount();
assertEquals("Connection count suppose to be 1 but got: " + count, count, 1);
/* shutdown */
impl.close();
}
@Test
@ -127,12 +177,13 @@ public class TestMetricsConnection {
}
for (String method : new String[] { "Get", "Scan", "Mutate" }) {
final String metricKey = "rpcCount_" + ClientService.getDescriptor().getName() + "_" + method;
final long metricVal = METRICS.rpcCounters.get(metricKey).getCount();
final long metricVal = METRICS.getRpcCounters().get(metricKey).getCount();
assertTrue("metric: " + metricKey + " val: " + metricVal, metricVal >= loop);
}
for (MetricsConnection.CallTracker t : new MetricsConnection.CallTracker[] { METRICS.getTracker,
METRICS.scanTracker, METRICS.multiTracker, METRICS.appendTracker, METRICS.deleteTracker,
METRICS.incrementTracker, METRICS.putTracker }) {
for (MetricsConnection.CallTracker t : new MetricsConnection.CallTracker[] {
METRICS.getGetTracker(), METRICS.getScanTracker(), METRICS.getMultiTracker(),
METRICS.getAppendTracker(), METRICS.getDeleteTracker(), METRICS.getIncrementTracker(),
METRICS.getPutTracker() }) {
assertEquals("Failed to invoke callTimer on " + t, loop, t.callTimer.getCount());
assertEquals("Failed to invoke reqHist on " + t, loop, t.reqHist.getCount());
assertEquals("Failed to invoke respHist on " + t, loop, t.respHist.getCount());

View File

@ -142,14 +142,14 @@ public abstract class ClientPushbackTestBase {
// time reported by above debug logging has significantly deviated.
MetricsConnection metrics = getConnectionMetrics();
String name = server.getServerName() + "," + Bytes.toStringBinary(regionName);
MetricsConnection.RegionStats rsStats = metrics.serverStats.get(server).get(regionName);
MetricsConnection.RegionStats rsStats = metrics.getServerStats().get(server).get(regionName);
assertEquals(name, rsStats.name);
assertEquals(rsStats.heapOccupancyHist.getSnapshot().getMean(),
(double) regionStats.getHeapOccupancyPercent(), 0.1);
assertEquals(rsStats.memstoreLoadHist.getSnapshot().getMean(),
(double) regionStats.getMemStoreLoadPercent(), 0.1);
MetricsConnection.RunnerStats runnerStats = metrics.runnerStats;
MetricsConnection.RunnerStats runnerStats = metrics.getRunnerStats();
assertEquals(1, runnerStats.delayRunners.getCount());
assertEquals(1, runnerStats.normalRunners.getCount());

View File

@ -176,8 +176,8 @@ public class TestMetaCache {
table.put(put);
// obtain the client metrics
long preGetRegionClears = metrics.metaCacheNumClearRegion.getCount();
long preGetServerClears = metrics.metaCacheNumClearServer.getCount();
long preGetRegionClears = metrics.getMetaCacheNumClearRegion().getCount();
long preGetServerClears = metrics.getMetaCacheNumClearServer().getCount();
// attempt a get on the test table
Get get = new Get(row);
@ -189,8 +189,8 @@ public class TestMetaCache {
}
// verify that no cache clearing took place
long postGetRegionClears = metrics.metaCacheNumClearRegion.getCount();
long postGetServerClears = metrics.metaCacheNumClearServer.getCount();
long postGetRegionClears = metrics.getMetaCacheNumClearRegion().getCount();
long postGetServerClears = metrics.getMetaCacheNumClearServer().getCount();
assertEquals(preGetRegionClears, postGetRegionClears);
assertEquals(preGetServerClears, postGetServerClears);
}

View File

@ -75,9 +75,9 @@ public class TestMultiActionMetricsFromClient {
MetricsConnection metrics =
((AsyncConnectionImpl) conn.toAsyncConnection()).getConnectionMetrics().get();
assertEquals(1, metrics.multiTracker.reqHist.getCount());
assertEquals(3, metrics.numActionsPerServerHist.getSnapshot().getMean(), 1e-15);
assertEquals(1, metrics.numActionsPerServerHist.getCount());
assertEquals(1, metrics.getMultiTracker().reqHist.getCount());
assertEquals(3, metrics.getNumActionsPerServerHist().getSnapshot().getMean(), 1e-15);
assertEquals(1, metrics.getNumActionsPerServerHist().getCount());
}
}
}

View File

@ -569,8 +569,8 @@ public class TestReplicasClient {
// reset
AsyncConnectionImpl conn = (AsyncConnectionImpl) HTU.getConnection().toAsyncConnection();
Counter hedgedReadOps = conn.getConnectionMetrics().get().hedgedReadOps;
Counter hedgedReadWin = conn.getConnectionMetrics().get().hedgedReadWin;
Counter hedgedReadOps = conn.getConnectionMetrics().get().getHedgedReadOps();
Counter hedgedReadWin = conn.getConnectionMetrics().get().getHedgedReadWin();
hedgedReadOps.dec(hedgedReadOps.getCount());
hedgedReadWin.dec(hedgedReadWin.getCount());