Signed-off-by: David Manning <67607031+d-c-manning@users.noreply.github.com> Signed-off-by: Viraj Jasani <vjasani@apache.org>
This commit is contained in:
parent
7b4b4ce81e
commit
8c4f0e3fd1
|
@ -115,6 +115,7 @@ public class AsyncConnectionImpl implements AsyncConnection {
|
|||
|
||||
private volatile boolean closed = false;
|
||||
|
||||
private final String metricsScope;
|
||||
private final Optional<MetricsConnection> metrics;
|
||||
|
||||
private final ClusterStatusListener clusterStatusListener;
|
||||
|
@ -123,6 +124,7 @@ public class AsyncConnectionImpl implements AsyncConnection {
|
|||
User user) {
|
||||
this.conf = conf;
|
||||
this.user = user;
|
||||
this.metricsScope = MetricsConnection.getScope(conf, clusterId, this);
|
||||
|
||||
if (user.isLoginFromKeytab()) {
|
||||
spawnRenewalChore(user.getUGI());
|
||||
|
@ -130,8 +132,8 @@ public class AsyncConnectionImpl implements AsyncConnection {
|
|||
this.connConf = new AsyncConnectionConfiguration(conf);
|
||||
this.registry = registry;
|
||||
if (conf.getBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, false)) {
|
||||
String scope = MetricsConnection.getScope(conf, clusterId, this);
|
||||
this.metrics = Optional.of(new MetricsConnection(scope, () -> null, () -> null));
|
||||
this.metrics =
|
||||
Optional.of(MetricsConnection.getMetricsConnection(metricsScope, () -> null, () -> null));
|
||||
} else {
|
||||
this.metrics = Optional.empty();
|
||||
}
|
||||
|
@ -226,7 +228,9 @@ public class AsyncConnectionImpl implements AsyncConnection {
|
|||
choreService = null;
|
||||
}
|
||||
}
|
||||
metrics.ifPresent(MetricsConnection::shutdown);
|
||||
if (metrics.isPresent()) {
|
||||
MetricsConnection.deleteMetricsConnection(metricsScope);
|
||||
}
|
||||
closed = true;
|
||||
}, "AsyncConnection.close");
|
||||
}
|
||||
|
|
|
@ -234,6 +234,8 @@ public class ConnectionImplementation implements ClusterConnection, Closeable {
|
|||
private final RpcClient rpcClient;
|
||||
|
||||
private final MetaCache metaCache;
|
||||
|
||||
private String metricsScope = null;
|
||||
private final MetricsConnection metrics;
|
||||
|
||||
protected User user;
|
||||
|
@ -322,8 +324,9 @@ public class ConnectionImplementation implements ClusterConnection, Closeable {
|
|||
retrieveClusterId();
|
||||
|
||||
if (conf.getBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, false)) {
|
||||
String scope = MetricsConnection.getScope(conf, clusterId, this);
|
||||
this.metrics = new MetricsConnection(scope, this::getBatchPool, this::getMetaLookupPool);
|
||||
this.metricsScope = MetricsConnection.getScope(conf, clusterId, this);
|
||||
this.metrics = MetricsConnection.getMetricsConnection(this.metricsScope, this::getBatchPool,
|
||||
this::getMetaLookupPool);
|
||||
} else {
|
||||
this.metrics = null;
|
||||
}
|
||||
|
@ -2161,7 +2164,7 @@ public class ConnectionImplementation implements ClusterConnection, Closeable {
|
|||
closeMaster();
|
||||
shutdownPools();
|
||||
if (this.metrics != null) {
|
||||
this.metrics.shutdown();
|
||||
MetricsConnection.deleteMetricsConnection(metricsScope);
|
||||
}
|
||||
this.closed = true;
|
||||
if (this.registry != null) {
|
||||
|
|
|
@ -26,6 +26,8 @@ import com.codahale.metrics.JmxReporter;
|
|||
import com.codahale.metrics.MetricRegistry;
|
||||
import com.codahale.metrics.RatioGauge;
|
||||
import com.codahale.metrics.Timer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.ConcurrentSkipListMap;
|
||||
|
@ -47,12 +49,43 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationPr
|
|||
/**
|
||||
* This class is for maintaining the various connection statistics and publishing them through the
|
||||
* metrics interfaces. This class manages its own {@link MetricRegistry} and {@link JmxReporter} so
|
||||
* as to not conflict with other uses of Yammer Metrics within the client application. Instantiating
|
||||
* this class implicitly creates and "starts" instances of these classes; be sure to call
|
||||
* {@link #shutdown()} to terminate the thread pools they allocate.
|
||||
* as to not conflict with other uses of Yammer Metrics within the client application. Calling
|
||||
* {@link #getMetricsConnection(String, Supplier, Supplier)} implicitly creates and "starts"
|
||||
* instances of these classes; be sure to call {@link #deleteMetricsConnection(String)} to terminate
|
||||
* the thread pools they allocate. The metrics reporter will be shutdown {@link #shutdown()} when
|
||||
* all connections within this metrics instances are closed.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class MetricsConnection implements StatisticTrackable {
|
||||
public final class MetricsConnection implements StatisticTrackable {
|
||||
|
||||
private static final ConcurrentMap<String, MetricsConnection> METRICS_INSTANCES =
|
||||
new ConcurrentHashMap<>();
|
||||
|
||||
static MetricsConnection getMetricsConnection(final String scope,
|
||||
Supplier<ThreadPoolExecutor> batchPool, Supplier<ThreadPoolExecutor> metaPool) {
|
||||
return METRICS_INSTANCES.compute(scope, (s, metricsConnection) -> {
|
||||
if (metricsConnection == null) {
|
||||
MetricsConnection newMetricsConn = new MetricsConnection(scope, batchPool, metaPool);
|
||||
newMetricsConn.incrConnectionCount();
|
||||
return newMetricsConn;
|
||||
} else {
|
||||
metricsConnection.addThreadPools(batchPool, metaPool);
|
||||
metricsConnection.incrConnectionCount();
|
||||
return metricsConnection;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
static void deleteMetricsConnection(final String scope) {
|
||||
METRICS_INSTANCES.computeIfPresent(scope, (s, metricsConnection) -> {
|
||||
metricsConnection.decrConnectionCount();
|
||||
if (metricsConnection.getConnectionCount() == 0) {
|
||||
metricsConnection.shutdown();
|
||||
return null;
|
||||
}
|
||||
return metricsConnection;
|
||||
});
|
||||
}
|
||||
|
||||
/** Set this key to {@code true} to enable metrics collection of client requests. */
|
||||
public static final String CLIENT_SIDE_METRICS_ENABLED_KEY = "hbase.client.metrics.enable";
|
||||
|
@ -231,7 +264,7 @@ public class MetricsConnection implements StatisticTrackable {
|
|||
}
|
||||
}
|
||||
|
||||
protected ConcurrentHashMap<ServerName, ConcurrentMap<byte[], RegionStats>> serverStats =
|
||||
private ConcurrentHashMap<ServerName, ConcurrentMap<byte[], RegionStats>> serverStats =
|
||||
new ConcurrentHashMap<>();
|
||||
|
||||
public void updateServerStats(ServerName serverName, byte[] regionName, Object r) {
|
||||
|
@ -272,7 +305,7 @@ public class MetricsConnection implements StatisticTrackable {
|
|||
|
||||
private final MetricRegistry registry;
|
||||
private final JmxReporter reporter;
|
||||
protected final String scope;
|
||||
private final String scope;
|
||||
|
||||
private final NewMetric<Timer> timerFactory = new NewMetric<Timer>() {
|
||||
@Override
|
||||
|
@ -295,66 +328,93 @@ public class MetricsConnection implements StatisticTrackable {
|
|||
}
|
||||
};
|
||||
|
||||
// List of thread pool per connection of the metrics.
|
||||
private final List<Supplier<ThreadPoolExecutor>> batchPools = new ArrayList<>();
|
||||
private final List<Supplier<ThreadPoolExecutor>> metaPools = new ArrayList<>();
|
||||
|
||||
// static metrics
|
||||
|
||||
protected final Counter metaCacheHits;
|
||||
protected final Counter metaCacheMisses;
|
||||
protected final CallTracker getTracker;
|
||||
protected final CallTracker scanTracker;
|
||||
protected final CallTracker appendTracker;
|
||||
protected final CallTracker deleteTracker;
|
||||
protected final CallTracker incrementTracker;
|
||||
protected final CallTracker putTracker;
|
||||
protected final CallTracker multiTracker;
|
||||
protected final RunnerStats runnerStats;
|
||||
protected final Counter metaCacheNumClearServer;
|
||||
protected final Counter metaCacheNumClearRegion;
|
||||
protected final Counter hedgedReadOps;
|
||||
protected final Counter hedgedReadWin;
|
||||
protected final Histogram concurrentCallsPerServerHist;
|
||||
protected final Histogram numActionsPerServerHist;
|
||||
protected final Counter nsLookups;
|
||||
protected final Counter nsLookupsFailed;
|
||||
protected final Timer overloadedBackoffTimer;
|
||||
private final Counter connectionCount;
|
||||
private final Counter metaCacheHits;
|
||||
private final Counter metaCacheMisses;
|
||||
private final CallTracker getTracker;
|
||||
private final CallTracker scanTracker;
|
||||
private final CallTracker appendTracker;
|
||||
private final CallTracker deleteTracker;
|
||||
private final CallTracker incrementTracker;
|
||||
private final CallTracker putTracker;
|
||||
private final CallTracker multiTracker;
|
||||
private final RunnerStats runnerStats;
|
||||
private final Counter metaCacheNumClearServer;
|
||||
private final Counter metaCacheNumClearRegion;
|
||||
private final Counter hedgedReadOps;
|
||||
private final Counter hedgedReadWin;
|
||||
private final Histogram concurrentCallsPerServerHist;
|
||||
private final Histogram numActionsPerServerHist;
|
||||
private final Counter nsLookups;
|
||||
private final Counter nsLookupsFailed;
|
||||
private final Timer overloadedBackoffTimer;
|
||||
|
||||
// dynamic metrics
|
||||
|
||||
// These maps are used to cache references to the metric instances that are managed by the
|
||||
// registry. I don't think their use perfectly removes redundant allocations, but it's
|
||||
// a big improvement over calling registry.newMetric each time.
|
||||
protected final ConcurrentMap<String, Timer> rpcTimers =
|
||||
private final ConcurrentMap<String, Timer> rpcTimers =
|
||||
new ConcurrentHashMap<>(CAPACITY, LOAD_FACTOR, CONCURRENCY_LEVEL);
|
||||
protected final ConcurrentMap<String, Histogram> rpcHistograms = new ConcurrentHashMap<>(
|
||||
private final ConcurrentMap<String, Histogram> rpcHistograms = new ConcurrentHashMap<>(
|
||||
CAPACITY * 2 /* tracking both request and response sizes */, LOAD_FACTOR, CONCURRENCY_LEVEL);
|
||||
private final ConcurrentMap<String, Counter> cacheDroppingExceptions =
|
||||
new ConcurrentHashMap<>(CAPACITY, LOAD_FACTOR, CONCURRENCY_LEVEL);
|
||||
protected final ConcurrentMap<String, Counter> rpcCounters =
|
||||
private final ConcurrentMap<String, Counter> rpcCounters =
|
||||
new ConcurrentHashMap<>(CAPACITY, LOAD_FACTOR, CONCURRENCY_LEVEL);
|
||||
|
||||
MetricsConnection(String scope, Supplier<ThreadPoolExecutor> batchPool,
|
||||
private MetricsConnection(String scope, Supplier<ThreadPoolExecutor> batchPool,
|
||||
Supplier<ThreadPoolExecutor> metaPool) {
|
||||
this.scope = scope;
|
||||
addThreadPools(batchPool, metaPool);
|
||||
this.registry = new MetricRegistry();
|
||||
this.registry.register(getExecutorPoolName(), new RatioGauge() {
|
||||
@Override
|
||||
protected Ratio getRatio() {
|
||||
ThreadPoolExecutor pool = batchPool.get();
|
||||
if (pool == null) {
|
||||
return Ratio.of(0, 0);
|
||||
int numerator = 0;
|
||||
int denominator = 0;
|
||||
for (Supplier<ThreadPoolExecutor> poolSupplier : batchPools) {
|
||||
ThreadPoolExecutor pool = poolSupplier.get();
|
||||
if (pool != null) {
|
||||
int activeCount = pool.getActiveCount();
|
||||
int maxPoolSize = pool.getMaximumPoolSize();
|
||||
/* The max thread usage ratio among batch pools of all connections */
|
||||
if (numerator == 0 || (numerator * maxPoolSize) < (activeCount * denominator)) {
|
||||
numerator = activeCount;
|
||||
denominator = maxPoolSize;
|
||||
}
|
||||
}
|
||||
}
|
||||
return Ratio.of(pool.getActiveCount(), pool.getMaximumPoolSize());
|
||||
return Ratio.of(numerator, denominator);
|
||||
}
|
||||
});
|
||||
this.registry.register(getMetaPoolName(), new RatioGauge() {
|
||||
@Override
|
||||
protected Ratio getRatio() {
|
||||
ThreadPoolExecutor pool = metaPool.get();
|
||||
if (pool == null) {
|
||||
return Ratio.of(0, 0);
|
||||
int numerator = 0;
|
||||
int denominator = 0;
|
||||
for (Supplier<ThreadPoolExecutor> poolSupplier : metaPools) {
|
||||
ThreadPoolExecutor pool = poolSupplier.get();
|
||||
if (pool != null) {
|
||||
int activeCount = pool.getActiveCount();
|
||||
int maxPoolSize = pool.getMaximumPoolSize();
|
||||
/* The max thread usage ratio among meta lookup pools of all connections */
|
||||
if (numerator == 0 || (numerator * maxPoolSize) < (activeCount * denominator)) {
|
||||
numerator = activeCount;
|
||||
denominator = maxPoolSize;
|
||||
}
|
||||
}
|
||||
}
|
||||
return Ratio.of(pool.getActiveCount(), pool.getMaximumPoolSize());
|
||||
return Ratio.of(numerator, denominator);
|
||||
}
|
||||
});
|
||||
this.connectionCount = registry.counter(name(this.getClass(), "connectionCount", scope));
|
||||
this.metaCacheHits = registry.counter(name(this.getClass(), "metaCacheHits", scope));
|
||||
this.metaCacheMisses = registry.counter(name(this.getClass(), "metaCacheMisses", scope));
|
||||
this.metaCacheNumClearServer =
|
||||
|
@ -397,8 +457,84 @@ public class MetricsConnection implements StatisticTrackable {
|
|||
return registry;
|
||||
}
|
||||
|
||||
public void shutdown() {
|
||||
this.reporter.stop();
|
||||
/** scope of the metrics object */
|
||||
public String getMetricScope() {
|
||||
return scope;
|
||||
}
|
||||
|
||||
/** serverStats metric */
|
||||
public ConcurrentHashMap<ServerName, ConcurrentMap<byte[], RegionStats>> getServerStats() {
|
||||
return serverStats;
|
||||
}
|
||||
|
||||
/** runnerStats metric */
|
||||
public RunnerStats getRunnerStats() {
|
||||
return runnerStats;
|
||||
}
|
||||
|
||||
/** metaCacheNumClearServer metric */
|
||||
public Counter getMetaCacheNumClearServer() {
|
||||
return metaCacheNumClearServer;
|
||||
}
|
||||
|
||||
/** metaCacheNumClearRegion metric */
|
||||
public Counter getMetaCacheNumClearRegion() {
|
||||
return metaCacheNumClearRegion;
|
||||
}
|
||||
|
||||
/** hedgedReadOps metric */
|
||||
public Counter getHedgedReadOps() {
|
||||
return hedgedReadOps;
|
||||
}
|
||||
|
||||
/** hedgedReadWin metric */
|
||||
public Counter getHedgedReadWin() {
|
||||
return hedgedReadWin;
|
||||
}
|
||||
|
||||
/** numActionsPerServerHist metric */
|
||||
public Histogram getNumActionsPerServerHist() {
|
||||
return numActionsPerServerHist;
|
||||
}
|
||||
|
||||
/** rpcCounters metric */
|
||||
public ConcurrentMap<String, Counter> getRpcCounters() {
|
||||
return rpcCounters;
|
||||
}
|
||||
|
||||
/** getTracker metric */
|
||||
public CallTracker getGetTracker() {
|
||||
return getTracker;
|
||||
}
|
||||
|
||||
/** scanTracker metric */
|
||||
public CallTracker getScanTracker() {
|
||||
return scanTracker;
|
||||
}
|
||||
|
||||
/** multiTracker metric */
|
||||
public CallTracker getMultiTracker() {
|
||||
return multiTracker;
|
||||
}
|
||||
|
||||
/** appendTracker metric */
|
||||
public CallTracker getAppendTracker() {
|
||||
return appendTracker;
|
||||
}
|
||||
|
||||
/** deleteTracker metric */
|
||||
public CallTracker getDeleteTracker() {
|
||||
return deleteTracker;
|
||||
}
|
||||
|
||||
/** incrementTracker metric */
|
||||
public CallTracker getIncrementTracker() {
|
||||
return incrementTracker;
|
||||
}
|
||||
|
||||
/** putTracker metric */
|
||||
public CallTracker getPutTracker() {
|
||||
return putTracker;
|
||||
}
|
||||
|
||||
/** Produce an instance of {@link CallStats} for clients to attach to RPCs. */
|
||||
|
@ -457,6 +593,28 @@ public class MetricsConnection implements StatisticTrackable {
|
|||
overloadedBackoffTimer.update(time, timeUnit);
|
||||
}
|
||||
|
||||
/** Return the connection count of the metrics within a scope */
|
||||
public long getConnectionCount() {
|
||||
return connectionCount.getCount();
|
||||
}
|
||||
|
||||
/** Increment the connection count of the metrics within a scope */
|
||||
private void incrConnectionCount() {
|
||||
connectionCount.inc();
|
||||
}
|
||||
|
||||
/** Decrement the connection count of the metrics within a scope */
|
||||
private void decrConnectionCount() {
|
||||
connectionCount.dec();
|
||||
}
|
||||
|
||||
/** Add thread pools of additional connections to the metrics */
|
||||
private void addThreadPools(Supplier<ThreadPoolExecutor> batchPool,
|
||||
Supplier<ThreadPoolExecutor> metaPool) {
|
||||
batchPools.add(batchPool);
|
||||
metaPools.add(metaPool);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a metric for {@code key} from {@code map}, or create it with {@code factory}.
|
||||
*/
|
||||
|
@ -474,6 +632,10 @@ public class MetricsConnection implements StatisticTrackable {
|
|||
.update(stats.getResponseSizeBytes());
|
||||
}
|
||||
|
||||
private void shutdown() {
|
||||
this.reporter.stop();
|
||||
}
|
||||
|
||||
/** Report RPC context to metrics system. */
|
||||
public void updateRpc(MethodDescriptor method, Message param, CallStats stats) {
|
||||
int callsPerServer = stats.getConcurrentCallsPerServer();
|
||||
|
|
|
@ -24,6 +24,8 @@ import static org.junit.Assert.assertTrue;
|
|||
import com.codahale.metrics.RatioGauge;
|
||||
import com.codahale.metrics.RatioGauge.Ratio;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.Executors;
|
||||
|
@ -64,14 +66,16 @@ public class TestMetricsConnection {
|
|||
private static final ThreadPoolExecutor BATCH_POOL =
|
||||
(ThreadPoolExecutor) Executors.newFixedThreadPool(2);
|
||||
|
||||
private static final String MOCK_CONN_STR = "mocked-connection";
|
||||
|
||||
@BeforeClass
|
||||
public static void beforeClass() {
|
||||
METRICS = new MetricsConnection("mocked-connection", () -> BATCH_POOL, () -> null);
|
||||
METRICS = MetricsConnection.getMetricsConnection(MOCK_CONN_STR, () -> BATCH_POOL, () -> null);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void afterClass() {
|
||||
METRICS.shutdown();
|
||||
MetricsConnection.deleteMetricsConnection(MOCK_CONN_STR);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -84,13 +88,59 @@ public class TestMetricsConnection {
|
|||
AsyncConnectionImpl impl = new AsyncConnectionImpl(conf, null, "foo", User.getCurrent());
|
||||
Optional<MetricsConnection> metrics = impl.getConnectionMetrics();
|
||||
assertTrue("Metrics should be present", metrics.isPresent());
|
||||
assertEquals(clusterId + "@" + Integer.toHexString(impl.hashCode()), metrics.get().scope);
|
||||
assertEquals(clusterId + "@" + Integer.toHexString(impl.hashCode()),
|
||||
metrics.get().getMetricScope());
|
||||
conf.set(MetricsConnection.METRICS_SCOPE_KEY, scope);
|
||||
impl = new AsyncConnectionImpl(conf, null, "foo", User.getCurrent());
|
||||
|
||||
metrics = impl.getConnectionMetrics();
|
||||
assertTrue("Metrics should be present", metrics.isPresent());
|
||||
assertEquals(scope, metrics.get().scope);
|
||||
assertEquals(scope, metrics.get().getMetricScope());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMetricsWithMutiConnections() throws IOException {
|
||||
Configuration conf = new Configuration();
|
||||
conf.setBoolean(MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY, true);
|
||||
conf.set(MetricsConnection.METRICS_SCOPE_KEY, "unit-test");
|
||||
|
||||
User user = User.getCurrent();
|
||||
|
||||
/* create multiple connections */
|
||||
final int num = 3;
|
||||
AsyncConnectionImpl impl;
|
||||
List<AsyncConnectionImpl> connList = new ArrayList<AsyncConnectionImpl>();
|
||||
for (int i = 0; i < num; i++) {
|
||||
impl = new AsyncConnectionImpl(conf, null, null, user);
|
||||
connList.add(impl);
|
||||
}
|
||||
|
||||
/* verify metrics presence */
|
||||
impl = connList.get(0);
|
||||
Optional<MetricsConnection> metrics = impl.getConnectionMetrics();
|
||||
assertTrue("Metrics should be present", metrics.isPresent());
|
||||
|
||||
/* verify connection count in a shared metrics */
|
||||
long count = metrics.get().getConnectionCount();
|
||||
assertEquals("Failed to verify connection count." + count, count, num);
|
||||
|
||||
/* close some connections */
|
||||
for (int i = 0; i < num - 1; i++) {
|
||||
connList.get(i).close();
|
||||
}
|
||||
|
||||
/* verify metrics presence again */
|
||||
impl = connList.get(num - 1);
|
||||
metrics = impl.getConnectionMetrics();
|
||||
assertTrue("Metrics should be present after some of connections are closed.",
|
||||
metrics.isPresent());
|
||||
|
||||
/* verify count of remaining connections */
|
||||
count = metrics.get().getConnectionCount();
|
||||
assertEquals("Connection count suppose to be 1 but got: " + count, count, 1);
|
||||
|
||||
/* shutdown */
|
||||
impl.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -108,13 +158,13 @@ public class TestMetricsConnection {
|
|||
new ConnectionImplementation(conf, null, User.getCurrent(), mockRegistry);
|
||||
MetricsConnection metrics = impl.getConnectionMetrics();
|
||||
assertNotNull("Metrics should be present", metrics);
|
||||
assertEquals(clusterId + "@" + Integer.toHexString(impl.hashCode()), metrics.scope);
|
||||
assertEquals(clusterId + "@" + Integer.toHexString(impl.hashCode()), metrics.getMetricScope());
|
||||
conf.set(MetricsConnection.METRICS_SCOPE_KEY, scope);
|
||||
impl = new ConnectionImplementation(conf, null, User.getCurrent(), mockRegistry);
|
||||
|
||||
metrics = impl.getConnectionMetrics();
|
||||
assertNotNull("Metrics should be present", metrics);
|
||||
assertEquals(scope, metrics.scope);
|
||||
assertEquals(scope, metrics.getMetricScope());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -154,12 +204,13 @@ public class TestMetricsConnection {
|
|||
}
|
||||
for (String method : new String[] { "Get", "Scan", "Mutate" }) {
|
||||
final String metricKey = "rpcCount_" + ClientService.getDescriptor().getName() + "_" + method;
|
||||
final long metricVal = METRICS.rpcCounters.get(metricKey).getCount();
|
||||
final long metricVal = METRICS.getRpcCounters().get(metricKey).getCount();
|
||||
assertTrue("metric: " + metricKey + " val: " + metricVal, metricVal >= loop);
|
||||
}
|
||||
for (MetricsConnection.CallTracker t : new MetricsConnection.CallTracker[] { METRICS.getTracker,
|
||||
METRICS.scanTracker, METRICS.multiTracker, METRICS.appendTracker, METRICS.deleteTracker,
|
||||
METRICS.incrementTracker, METRICS.putTracker }) {
|
||||
for (MetricsConnection.CallTracker t : new MetricsConnection.CallTracker[] {
|
||||
METRICS.getGetTracker(), METRICS.getScanTracker(), METRICS.getMultiTracker(),
|
||||
METRICS.getAppendTracker(), METRICS.getDeleteTracker(), METRICS.getIncrementTracker(),
|
||||
METRICS.getPutTracker() }) {
|
||||
assertEquals("Failed to invoke callTimer on " + t, loop, t.callTimer.getCount());
|
||||
assertEquals("Failed to invoke reqHist on " + t, loop, t.reqHist.getCount());
|
||||
assertEquals("Failed to invoke respHist on " + t, loop, t.respHist.getCount());
|
||||
|
|
|
@ -142,14 +142,14 @@ public abstract class ClientPushbackTestBase {
|
|||
// time reported by above debug logging has significantly deviated.
|
||||
MetricsConnection metrics = getConnectionMetrics();
|
||||
String name = server.getServerName() + "," + Bytes.toStringBinary(regionName);
|
||||
MetricsConnection.RegionStats rsStats = metrics.serverStats.get(server).get(regionName);
|
||||
MetricsConnection.RegionStats rsStats = metrics.getServerStats().get(server).get(regionName);
|
||||
assertEquals(name, rsStats.name);
|
||||
assertEquals(rsStats.heapOccupancyHist.getSnapshot().getMean(),
|
||||
(double) regionStats.getHeapOccupancyPercent(), 0.1);
|
||||
assertEquals(rsStats.memstoreLoadHist.getSnapshot().getMean(),
|
||||
(double) regionStats.getMemStoreLoadPercent(), 0.1);
|
||||
|
||||
MetricsConnection.RunnerStats runnerStats = metrics.runnerStats;
|
||||
MetricsConnection.RunnerStats runnerStats = metrics.getRunnerStats();
|
||||
|
||||
assertEquals(1, runnerStats.delayRunners.getCount());
|
||||
assertEquals(1, runnerStats.normalRunners.getCount());
|
||||
|
|
|
@ -214,7 +214,7 @@ public class TestClientOperationTimeout {
|
|||
|
||||
MetricsConnection metrics =
|
||||
((ConnectionImplementation) specialConnection).getConnectionMetrics();
|
||||
long metaCacheNumClearServerPreFailure = metrics.metaCacheNumClearServer.getCount();
|
||||
long metaCacheNumClearServerPreFailure = metrics.getMetaCacheNumClearServer().getCount();
|
||||
|
||||
DELAY_META_SCAN = 400;
|
||||
List<Get> gets = new ArrayList<>();
|
||||
|
@ -232,7 +232,7 @@ public class TestClientOperationTimeout {
|
|||
|
||||
// verify we do not clear the cache in this situation otherwise we will create pathological
|
||||
// feedback loop with multigets See: HBASE-27487
|
||||
long metaCacheNumClearServerPostFailure = metrics.metaCacheNumClearServer.getCount();
|
||||
long metaCacheNumClearServerPostFailure = metrics.getMetaCacheNumClearServer().getCount();
|
||||
Assert.assertEquals(metaCacheNumClearServerPreFailure, metaCacheNumClearServerPostFailure);
|
||||
|
||||
for (Throwable cause : expected.getCauses()) {
|
||||
|
|
|
@ -243,8 +243,8 @@ public class TestMetaCache {
|
|||
|
||||
// obtain the client metrics
|
||||
MetricsConnection metrics = conn.getConnectionMetrics();
|
||||
long preGetRegionClears = metrics.metaCacheNumClearRegion.getCount();
|
||||
long preGetServerClears = metrics.metaCacheNumClearServer.getCount();
|
||||
long preGetRegionClears = metrics.getMetaCacheNumClearRegion().getCount();
|
||||
long preGetServerClears = metrics.getMetaCacheNumClearServer().getCount();
|
||||
|
||||
// attempt a get on the test table
|
||||
Get get = new Get(row);
|
||||
|
@ -256,8 +256,8 @@ public class TestMetaCache {
|
|||
}
|
||||
|
||||
// verify that no cache clearing took place
|
||||
long postGetRegionClears = metrics.metaCacheNumClearRegion.getCount();
|
||||
long postGetServerClears = metrics.metaCacheNumClearServer.getCount();
|
||||
long postGetRegionClears = metrics.getMetaCacheNumClearRegion().getCount();
|
||||
long postGetServerClears = metrics.getMetaCacheNumClearServer().getCount();
|
||||
assertEquals(preGetRegionClears, postGetRegionClears);
|
||||
assertEquals(preGetServerClears, postGetServerClears);
|
||||
} finally {
|
||||
|
|
|
@ -77,9 +77,9 @@ public class TestMultiActionMetricsFromClient {
|
|||
mutator.close();
|
||||
|
||||
MetricsConnection metrics = conn.getConnectionMetrics();
|
||||
assertEquals(1, metrics.multiTracker.reqHist.getCount());
|
||||
assertEquals(3, metrics.numActionsPerServerHist.getSnapshot().getMean(), 1e-15);
|
||||
assertEquals(1, metrics.numActionsPerServerHist.getCount());
|
||||
assertEquals(1, metrics.getMultiTracker().reqHist.getCount());
|
||||
assertEquals(3, metrics.getNumActionsPerServerHist().getSnapshot().getMean(), 1e-15);
|
||||
assertEquals(1, metrics.getNumActionsPerServerHist().getCount());
|
||||
} finally {
|
||||
conn.close();
|
||||
}
|
||||
|
|
|
@ -542,8 +542,8 @@ public class TestReplicasClient {
|
|||
|
||||
// reset
|
||||
ClusterConnection connection = (ClusterConnection) HTU.getConnection();
|
||||
Counter hedgedReadOps = connection.getConnectionMetrics().hedgedReadOps;
|
||||
Counter hedgedReadWin = connection.getConnectionMetrics().hedgedReadWin;
|
||||
Counter hedgedReadOps = connection.getConnectionMetrics().getHedgedReadOps();
|
||||
Counter hedgedReadWin = connection.getConnectionMetrics().getHedgedReadWin();
|
||||
hedgedReadOps.dec(hedgedReadOps.getCount());
|
||||
hedgedReadWin.dec(hedgedReadWin.getCount());
|
||||
|
||||
|
|
Loading…
Reference in New Issue