diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java index 1eebcab4c93..cbc3eaa2e46 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java @@ -126,7 +126,8 @@ public class AsyncConnectionImpl implements AsyncConnection { this.connConf = new AsyncConnectionConfiguration(conf); this.registry = registry; if (conf.getBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, false)) { - this.metrics = Optional.of(new MetricsConnection(this.toString(), () -> null, () -> null)); + String scope = MetricsConnection.getScope(conf, clusterId, this); + this.metrics = Optional.of(new MetricsConnection(scope, () -> null, () -> null)); } else { this.metrics = Optional.empty(); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java index 593afac9731..c360a6838d7 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java @@ -296,13 +296,6 @@ public class ConnectionImplementation implements ClusterConnection, Closeable { this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf, interceptor, this.stats); this.backoffPolicy = ClientBackoffPolicyFactory.create(conf); this.asyncProcess = new AsyncProcess(this, conf, rpcCallerFactory, rpcControllerFactory); - if (conf.getBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, false)) { - this.metrics = - new MetricsConnection(this.toString(), this::getBatchPool, this::getMetaLookupPool); - } else { - this.metrics = null; - } - this.metaCache = new MetaCache(this.metrics); boolean shouldListen = conf.getBoolean(HConstants.STATUS_PUBLISHED, HConstants.STATUS_PUBLISHED_DEFAULT); @@ -321,6 +314,15 @@ public class ConnectionImplementation implements ClusterConnection, Closeable { } retrieveClusterId(); + if (conf.getBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, false)) { + String scope = MetricsConnection.getScope(conf, clusterId, this); + this.metrics = + new MetricsConnection(scope, this::getBatchPool, this::getMetaLookupPool); + } else { + this.metrics = null; + } + this.metaCache = new MetaCache(this.metrics); + this.rpcClient = RpcClientFactory.createClient(this.conf, this.clusterId, this.metrics); // Do we publish the status? diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java index 8566ec551e7..76f4a9ab8cf 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java @@ -34,6 +34,7 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest; @@ -58,6 +59,34 @@ public class MetricsConnection implements StatisticTrackable { /** Set this key to {@code true} to enable metrics collection of client requests. */ public static final String CLIENT_SIDE_METRICS_ENABLED_KEY = "hbase.client.metrics.enable"; + /** + * Set to specify a custom scope for the metrics published through {@link MetricsConnection}. + * The scope is added to JMX MBean objectName, and defaults to a combination of the Connection's + * clusterId and hashCode. For example, a default value for a connection to cluster "foo" might + * be "foo-7d9d0818", where "7d9d0818" is the hashCode of the underlying AsyncConnectionImpl. + * Users may set this key to give a more contextual name for this scope. For example, one might + * want to differentiate a read connection from a write connection by setting the scopes to + * "foo-read" and "foo-write" respectively. + * + * Scope is the only thing that lends any uniqueness to the metrics. Care should be taken to + * avoid using the same scope for multiple Connections, otherwise the metrics may aggregate in + * unforeseen ways. + */ + public static final String METRICS_SCOPE_KEY = "hbase.client.metrics.scope"; + + /** + * Returns the scope for a MetricsConnection based on the configured {@link #METRICS_SCOPE_KEY} + * or by generating a default from the passed clusterId and connectionObj's hashCode. + * @param conf configuration for the connection + * @param clusterId clusterId for the connection + * @param connectionObj either a Connection or AsyncConnectionImpl, the instance + * creating this MetricsConnection. + */ + static String getScope(Configuration conf, String clusterId, Object connectionObj) { + return conf.get(METRICS_SCOPE_KEY, + clusterId + "@" + Integer.toHexString(connectionObj.hashCode())); + } + private static final String CNT_BASE = "rpcCount_"; private static final String DRTN_BASE = "rpcCallDurationMs_"; private static final String REQ_BASE = "rpcCallRequestSizeBytes_"; @@ -252,7 +281,7 @@ public class MetricsConnection implements StatisticTrackable { private final MetricRegistry registry; private final JmxReporter reporter; - private final String scope; + protected final String scope; private final NewMetric timerFactory = new NewMetric() { @Override public Timer newMetric(Class clazz, String name, String scope) { diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestMetricsConnection.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestMetricsConnection.java index d48806def23..69d0389eb94 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestMetricsConnection.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestMetricsConnection.java @@ -18,14 +18,18 @@ package org.apache.hadoop.hbase.client; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; - import com.codahale.metrics.RatioGauge; import com.codahale.metrics.RatioGauge.Ratio; import java.io.IOException; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; import java.util.concurrent.ThreadPoolExecutor; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.MetricsTests; import org.apache.hadoop.hbase.testclassification.SmallTests; @@ -35,9 +39,8 @@ import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; - +import org.mockito.Mockito; import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; - import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest; @@ -67,6 +70,49 @@ public class TestMetricsConnection { METRICS.shutdown(); } + @Test + public void testMetricsConnectionScopeAsyncClient() throws IOException { + Configuration conf = new Configuration(); + String clusterId = "foo"; + String scope = "testScope"; + conf.setBoolean(MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY, true); + + AsyncConnectionImpl impl = new AsyncConnectionImpl(conf, null, "foo", User.getCurrent()); + Optional metrics = impl.getConnectionMetrics(); + assertTrue("Metrics should be present", metrics.isPresent()); + assertEquals(clusterId + "@" + Integer.toHexString(impl.hashCode()), metrics.get().scope); + conf.set(MetricsConnection.METRICS_SCOPE_KEY, scope); + impl = new AsyncConnectionImpl(conf, null, "foo", User.getCurrent()); + + metrics = impl.getConnectionMetrics(); + assertTrue("Metrics should be present", metrics.isPresent()); + assertEquals(scope, metrics.get().scope); + } + + @Test + public void testMetricsConnectionScopeBlockingClient() throws IOException { + Configuration conf = new Configuration(); + String clusterId = "foo"; + String scope = "testScope"; + conf.setBoolean(MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY, true); + + ConnectionRegistry mockRegistry = Mockito.mock(ConnectionRegistry.class); + Mockito.when(mockRegistry.getClusterId()) + .thenReturn(CompletableFuture.completedFuture(clusterId)); + + ConnectionImplementation impl = new ConnectionImplementation(conf, null, + User.getCurrent(), mockRegistry); + MetricsConnection metrics = impl.getConnectionMetrics(); + assertNotNull("Metrics should be present", metrics); + assertEquals(clusterId + "@" + Integer.toHexString(impl.hashCode()), metrics.scope); + conf.set(MetricsConnection.METRICS_SCOPE_KEY, scope); + impl = new ConnectionImplementation(conf, null, User.getCurrent(), mockRegistry); + + metrics = impl.getConnectionMetrics(); + assertNotNull("Metrics should be present", metrics); + assertEquals(scope, metrics.scope); + } + @Test public void testStaticMetrics() throws IOException { final byte[] foo = Bytes.toBytes("foo");