HBASE-26891 Make MetricsConnection scope configurable (#4365)
Signed-off-by: Nick Dimiduk <ndimiduk@apache.org>
This commit is contained in:
parent
2240025349
commit
e5b008418f
|
@ -126,7 +126,8 @@ public class AsyncConnectionImpl implements AsyncConnection {
|
||||||
this.connConf = new AsyncConnectionConfiguration(conf);
|
this.connConf = new AsyncConnectionConfiguration(conf);
|
||||||
this.registry = registry;
|
this.registry = registry;
|
||||||
if (conf.getBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, false)) {
|
if (conf.getBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, false)) {
|
||||||
this.metrics = Optional.of(new MetricsConnection(this.toString(), () -> null, () -> null));
|
String scope = MetricsConnection.getScope(conf, clusterId, this);
|
||||||
|
this.metrics = Optional.of(new MetricsConnection(scope, () -> null, () -> null));
|
||||||
} else {
|
} else {
|
||||||
this.metrics = Optional.empty();
|
this.metrics = Optional.empty();
|
||||||
}
|
}
|
||||||
|
|
|
@ -296,13 +296,6 @@ public class ConnectionImplementation implements ClusterConnection, Closeable {
|
||||||
this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf, interceptor, this.stats);
|
this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf, interceptor, this.stats);
|
||||||
this.backoffPolicy = ClientBackoffPolicyFactory.create(conf);
|
this.backoffPolicy = ClientBackoffPolicyFactory.create(conf);
|
||||||
this.asyncProcess = new AsyncProcess(this, conf, rpcCallerFactory, rpcControllerFactory);
|
this.asyncProcess = new AsyncProcess(this, conf, rpcCallerFactory, rpcControllerFactory);
|
||||||
if (conf.getBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, false)) {
|
|
||||||
this.metrics =
|
|
||||||
new MetricsConnection(this.toString(), this::getBatchPool, this::getMetaLookupPool);
|
|
||||||
} else {
|
|
||||||
this.metrics = null;
|
|
||||||
}
|
|
||||||
this.metaCache = new MetaCache(this.metrics);
|
|
||||||
|
|
||||||
boolean shouldListen =
|
boolean shouldListen =
|
||||||
conf.getBoolean(HConstants.STATUS_PUBLISHED, HConstants.STATUS_PUBLISHED_DEFAULT);
|
conf.getBoolean(HConstants.STATUS_PUBLISHED, HConstants.STATUS_PUBLISHED_DEFAULT);
|
||||||
|
@ -321,6 +314,15 @@ public class ConnectionImplementation implements ClusterConnection, Closeable {
|
||||||
}
|
}
|
||||||
retrieveClusterId();
|
retrieveClusterId();
|
||||||
|
|
||||||
|
if (conf.getBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, false)) {
|
||||||
|
String scope = MetricsConnection.getScope(conf, clusterId, this);
|
||||||
|
this.metrics =
|
||||||
|
new MetricsConnection(scope, this::getBatchPool, this::getMetaLookupPool);
|
||||||
|
} else {
|
||||||
|
this.metrics = null;
|
||||||
|
}
|
||||||
|
this.metaCache = new MetaCache(this.metrics);
|
||||||
|
|
||||||
this.rpcClient = RpcClientFactory.createClient(this.conf, this.clusterId, this.metrics);
|
this.rpcClient = RpcClientFactory.createClient(this.conf, this.clusterId, this.metrics);
|
||||||
|
|
||||||
// Do we publish the status?
|
// Do we publish the status?
|
||||||
|
|
|
@ -34,6 +34,7 @@ import java.util.concurrent.ThreadPoolExecutor;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.function.Supplier;
|
import java.util.function.Supplier;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
|
||||||
|
@ -58,6 +59,34 @@ public class MetricsConnection implements StatisticTrackable {
|
||||||
/** Set this key to {@code true} to enable metrics collection of client requests. */
|
/** Set this key to {@code true} to enable metrics collection of client requests. */
|
||||||
public static final String CLIENT_SIDE_METRICS_ENABLED_KEY = "hbase.client.metrics.enable";
|
public static final String CLIENT_SIDE_METRICS_ENABLED_KEY = "hbase.client.metrics.enable";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set to specify a custom scope for the metrics published through {@link MetricsConnection}.
|
||||||
|
* The scope is added to JMX MBean objectName, and defaults to a combination of the Connection's
|
||||||
|
* clusterId and hashCode. For example, a default value for a connection to cluster "foo" might
|
||||||
|
* be "foo-7d9d0818", where "7d9d0818" is the hashCode of the underlying AsyncConnectionImpl.
|
||||||
|
* Users may set this key to give a more contextual name for this scope. For example, one might
|
||||||
|
* want to differentiate a read connection from a write connection by setting the scopes to
|
||||||
|
* "foo-read" and "foo-write" respectively.
|
||||||
|
*
|
||||||
|
* Scope is the only thing that lends any uniqueness to the metrics. Care should be taken to
|
||||||
|
* avoid using the same scope for multiple Connections, otherwise the metrics may aggregate in
|
||||||
|
* unforeseen ways.
|
||||||
|
*/
|
||||||
|
public static final String METRICS_SCOPE_KEY = "hbase.client.metrics.scope";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the scope for a MetricsConnection based on the configured {@link #METRICS_SCOPE_KEY}
|
||||||
|
* or by generating a default from the passed clusterId and connectionObj's hashCode.
|
||||||
|
* @param conf configuration for the connection
|
||||||
|
* @param clusterId clusterId for the connection
|
||||||
|
* @param connectionObj either a Connection or AsyncConnectionImpl, the instance
|
||||||
|
* creating this MetricsConnection.
|
||||||
|
*/
|
||||||
|
static String getScope(Configuration conf, String clusterId, Object connectionObj) {
|
||||||
|
return conf.get(METRICS_SCOPE_KEY,
|
||||||
|
clusterId + "@" + Integer.toHexString(connectionObj.hashCode()));
|
||||||
|
}
|
||||||
|
|
||||||
private static final String CNT_BASE = "rpcCount_";
|
private static final String CNT_BASE = "rpcCount_";
|
||||||
private static final String DRTN_BASE = "rpcCallDurationMs_";
|
private static final String DRTN_BASE = "rpcCallDurationMs_";
|
||||||
private static final String REQ_BASE = "rpcCallRequestSizeBytes_";
|
private static final String REQ_BASE = "rpcCallRequestSizeBytes_";
|
||||||
|
@ -252,7 +281,7 @@ public class MetricsConnection implements StatisticTrackable {
|
||||||
|
|
||||||
private final MetricRegistry registry;
|
private final MetricRegistry registry;
|
||||||
private final JmxReporter reporter;
|
private final JmxReporter reporter;
|
||||||
private final String scope;
|
protected final String scope;
|
||||||
|
|
||||||
private final NewMetric<Timer> timerFactory = new NewMetric<Timer>() {
|
private final NewMetric<Timer> timerFactory = new NewMetric<Timer>() {
|
||||||
@Override public Timer newMetric(Class<?> clazz, String name, String scope) {
|
@Override public Timer newMetric(Class<?> clazz, String name, String scope) {
|
||||||
|
|
|
@ -18,14 +18,18 @@
|
||||||
package org.apache.hadoop.hbase.client;
|
package org.apache.hadoop.hbase.client;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertNotNull;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
import com.codahale.metrics.RatioGauge;
|
import com.codahale.metrics.RatioGauge;
|
||||||
import com.codahale.metrics.RatioGauge.Ratio;
|
import com.codahale.metrics.RatioGauge.Ratio;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.Optional;
|
||||||
|
import java.util.concurrent.CompletableFuture;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.ThreadPoolExecutor;
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
|
import org.apache.hadoop.hbase.security.User;
|
||||||
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
||||||
import org.apache.hadoop.hbase.testclassification.MetricsTests;
|
import org.apache.hadoop.hbase.testclassification.MetricsTests;
|
||||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||||
|
@ -35,9 +39,8 @@ import org.junit.BeforeClass;
|
||||||
import org.junit.ClassRule;
|
import org.junit.ClassRule;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
|
import org.mockito.Mockito;
|
||||||
import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
|
import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest;
|
||||||
|
@ -67,6 +70,49 @@ public class TestMetricsConnection {
|
||||||
METRICS.shutdown();
|
METRICS.shutdown();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMetricsConnectionScopeAsyncClient() throws IOException {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
String clusterId = "foo";
|
||||||
|
String scope = "testScope";
|
||||||
|
conf.setBoolean(MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY, true);
|
||||||
|
|
||||||
|
AsyncConnectionImpl impl = new AsyncConnectionImpl(conf, null, "foo", User.getCurrent());
|
||||||
|
Optional<MetricsConnection> metrics = impl.getConnectionMetrics();
|
||||||
|
assertTrue("Metrics should be present", metrics.isPresent());
|
||||||
|
assertEquals(clusterId + "@" + Integer.toHexString(impl.hashCode()), metrics.get().scope);
|
||||||
|
conf.set(MetricsConnection.METRICS_SCOPE_KEY, scope);
|
||||||
|
impl = new AsyncConnectionImpl(conf, null, "foo", User.getCurrent());
|
||||||
|
|
||||||
|
metrics = impl.getConnectionMetrics();
|
||||||
|
assertTrue("Metrics should be present", metrics.isPresent());
|
||||||
|
assertEquals(scope, metrics.get().scope);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMetricsConnectionScopeBlockingClient() throws IOException {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
String clusterId = "foo";
|
||||||
|
String scope = "testScope";
|
||||||
|
conf.setBoolean(MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY, true);
|
||||||
|
|
||||||
|
ConnectionRegistry mockRegistry = Mockito.mock(ConnectionRegistry.class);
|
||||||
|
Mockito.when(mockRegistry.getClusterId())
|
||||||
|
.thenReturn(CompletableFuture.completedFuture(clusterId));
|
||||||
|
|
||||||
|
ConnectionImplementation impl = new ConnectionImplementation(conf, null,
|
||||||
|
User.getCurrent(), mockRegistry);
|
||||||
|
MetricsConnection metrics = impl.getConnectionMetrics();
|
||||||
|
assertNotNull("Metrics should be present", metrics);
|
||||||
|
assertEquals(clusterId + "@" + Integer.toHexString(impl.hashCode()), metrics.scope);
|
||||||
|
conf.set(MetricsConnection.METRICS_SCOPE_KEY, scope);
|
||||||
|
impl = new ConnectionImplementation(conf, null, User.getCurrent(), mockRegistry);
|
||||||
|
|
||||||
|
metrics = impl.getConnectionMetrics();
|
||||||
|
assertNotNull("Metrics should be present", metrics);
|
||||||
|
assertEquals(scope, metrics.scope);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testStaticMetrics() throws IOException {
|
public void testStaticMetrics() throws IOException {
|
||||||
final byte[] foo = Bytes.toBytes("foo");
|
final byte[] foo = Bytes.toBytes("foo");
|
||||||
|
|
Loading…
Reference in New Issue