HBASE-27853 Add client side table metrics for rpc calls and request latency.
This commit is contained in:
parent
bca7caa2f2
commit
4d70a22d9a
|
@ -137,8 +137,8 @@ public class AsyncConnectionImpl implements AsyncConnection {
|
|||
this.connConf = new AsyncConnectionConfiguration(conf);
|
||||
this.registry = registry;
|
||||
if (conf.getBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, false)) {
|
||||
this.metrics =
|
||||
Optional.of(MetricsConnection.getMetricsConnection(metricsScope, () -> null, () -> null));
|
||||
this.metrics = Optional
|
||||
.of(MetricsConnection.getMetricsConnection(conf, metricsScope, () -> null, () -> null));
|
||||
} else {
|
||||
this.metrics = Optional.empty();
|
||||
}
|
||||
|
|
|
@ -359,7 +359,8 @@ class AsyncScanSingleRegionRpcRetryingCaller {
|
|||
private void closeScanner() {
|
||||
incRPCCallsMetrics(scanMetrics, regionServerRemote);
|
||||
resetController(controller, rpcTimeoutNs, HConstants.HIGH_QOS);
|
||||
ScanRequest req = RequestConverter.buildScanRequest(this.scannerId, 0, true, false);
|
||||
ScanRequest req = RequestConverter.buildScanRequest(loc.getRegion().getRegionName(),
|
||||
this.scannerId, 0, true, false);
|
||||
stub.scan(controller, req, resp -> {
|
||||
if (controller.failed()) {
|
||||
LOG.warn("Call to " + loc.getServerName() + " for closing scanner id = " + scannerId
|
||||
|
@ -584,8 +585,8 @@ class AsyncScanSingleRegionRpcRetryingCaller {
|
|||
incRPCRetriesMetrics(scanMetrics, regionServerRemote);
|
||||
}
|
||||
resetController(controller, callTimeoutNs, priority);
|
||||
ScanRequest req = RequestConverter.buildScanRequest(scannerId, scan.getCaching(), false,
|
||||
nextCallSeq, scan.isScanMetricsEnabled(), false, scan.getLimit());
|
||||
ScanRequest req = RequestConverter.buildScanRequest(loc.getRegion().getRegionName(), scannerId,
|
||||
scan.getCaching(), false, nextCallSeq, scan.isScanMetricsEnabled(), false, scan.getLimit());
|
||||
final Context context = Context.current();
|
||||
stub.scan(controller, req, resp -> {
|
||||
try (Scope ignored = context.makeCurrent()) {
|
||||
|
@ -606,8 +607,8 @@ class AsyncScanSingleRegionRpcRetryingCaller {
|
|||
incRPCCallsMetrics(scanMetrics, regionServerRemote);
|
||||
nextCallSeq++;
|
||||
resetController(controller, rpcTimeoutNs, priority);
|
||||
ScanRequest req =
|
||||
RequestConverter.buildScanRequest(scannerId, 0, false, nextCallSeq, false, true, -1);
|
||||
ScanRequest req = RequestConverter.buildScanRequest(loc.getRegion().getRegionName(), scannerId,
|
||||
0, false, nextCallSeq, false, true, -1);
|
||||
stub.scan(controller, req, resp -> {
|
||||
});
|
||||
}
|
||||
|
|
|
@ -34,39 +34,46 @@ import java.util.concurrent.ConcurrentSkipListMap;
|
|||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Supplier;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
|
||||
|
||||
/**
|
||||
* This class is for maintaining the various connection statistics and publishing them through the
|
||||
* metrics interfaces. This class manages its own {@link MetricRegistry} and {@link JmxReporter} so
|
||||
* as to not conflict with other uses of Yammer Metrics within the client application. Calling
|
||||
* {@link #getMetricsConnection(String, Supplier, Supplier)} implicitly creates and "starts"
|
||||
* instances of these classes; be sure to call {@link #deleteMetricsConnection(String)} to terminate
|
||||
* the thread pools they allocate. The metrics reporter will be shutdown {@link #shutdown()} when
|
||||
* all connections within this metrics instances are closed.
|
||||
* {@link #getMetricsConnection(Configuration, String, Supplier, Supplier)} implicitly creates and
|
||||
* "starts" instances of these classes; be sure to call {@link #deleteMetricsConnection(String)} to
|
||||
* terminate the thread pools they allocate. The metrics reporter will be shutdown
|
||||
* {@link #shutdown()} when all connections within this metrics instances are closed.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public final class MetricsConnection implements StatisticTrackable {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(MetricsConnection.class);
|
||||
|
||||
private static final ConcurrentMap<String, MetricsConnection> METRICS_INSTANCES =
|
||||
new ConcurrentHashMap<>();
|
||||
|
||||
static MetricsConnection getMetricsConnection(final String scope,
|
||||
static MetricsConnection getMetricsConnection(final Configuration conf, final String scope,
|
||||
Supplier<ThreadPoolExecutor> batchPool, Supplier<ThreadPoolExecutor> metaPool) {
|
||||
return METRICS_INSTANCES.compute(scope, (s, metricsConnection) -> {
|
||||
if (metricsConnection == null) {
|
||||
MetricsConnection newMetricsConn = new MetricsConnection(scope, batchPool, metaPool);
|
||||
MetricsConnection newMetricsConn = new MetricsConnection(conf, scope, batchPool, metaPool);
|
||||
newMetricsConn.incrConnectionCount();
|
||||
return newMetricsConn;
|
||||
} else {
|
||||
|
@ -91,6 +98,10 @@ public final class MetricsConnection implements StatisticTrackable {
|
|||
/** Set this key to {@code true} to enable metrics collection of client requests. */
|
||||
public static final String CLIENT_SIDE_METRICS_ENABLED_KEY = "hbase.client.metrics.enable";
|
||||
|
||||
/** Set this key to {@code true} to enable table metrics collection of client requests. */
|
||||
public static final String CLIENT_SIDE_TABLE_METRICS_ENABLED_KEY =
|
||||
"hbase.client.table.metrics.enable";
|
||||
|
||||
/**
|
||||
* Set to specify a custom scope for the metrics published through {@link MetricsConnection}. The
|
||||
* scope is added to JMX MBean objectName, and defaults to a combination of the Connection's
|
||||
|
@ -374,8 +385,11 @@ public final class MetricsConnection implements StatisticTrackable {
|
|||
private final ConcurrentMap<String, Counter> rpcCounters =
|
||||
new ConcurrentHashMap<>(CAPACITY, LOAD_FACTOR, CONCURRENCY_LEVEL);
|
||||
|
||||
private MetricsConnection(String scope, Supplier<ThreadPoolExecutor> batchPool,
|
||||
Supplier<ThreadPoolExecutor> metaPool) {
|
||||
private final Configuration conf;
|
||||
|
||||
private MetricsConnection(Configuration conf, String scope,
|
||||
Supplier<ThreadPoolExecutor> batchPool, Supplier<ThreadPoolExecutor> metaPool) {
|
||||
this.conf = conf;
|
||||
this.scope = scope;
|
||||
addThreadPools(batchPool, metaPool);
|
||||
this.registry = new MetricRegistry();
|
||||
|
@ -506,6 +520,16 @@ public final class MetricsConnection implements StatisticTrackable {
|
|||
return rpcCounters;
|
||||
}
|
||||
|
||||
/** rpcTimers metric */
|
||||
public ConcurrentMap<String, Timer> getRpcTimers() {
|
||||
return rpcTimers;
|
||||
}
|
||||
|
||||
/** rpcHistograms metric */
|
||||
public ConcurrentMap<String, Histogram> getRpcHistograms() {
|
||||
return rpcHistograms;
|
||||
}
|
||||
|
||||
/** getTracker metric */
|
||||
public CallTracker getGetTracker() {
|
||||
return getTracker;
|
||||
|
@ -671,10 +695,13 @@ public final class MetricsConnection implements StatisticTrackable {
|
|||
// this implementation is tied directly to protobuf implementation details. would be better
|
||||
// if we could dispatch based on something static, ie, request Message type.
|
||||
if (method.getService() == ClientService.getDescriptor()) {
|
||||
String name = methodName;
|
||||
switch (method.getIndex()) {
|
||||
case 0:
|
||||
assert "Get".equals(method.getName());
|
||||
getTracker.updateRpc(stats);
|
||||
updateTableMetric(name,
|
||||
parseTableName(((ClientProtos.GetRequest) param).getRegion(), param), stats, e);
|
||||
return;
|
||||
case 1:
|
||||
assert "Mutate".equals(method.getName());
|
||||
|
@ -682,22 +709,31 @@ public final class MetricsConnection implements StatisticTrackable {
|
|||
switch (mutationType) {
|
||||
case APPEND:
|
||||
appendTracker.updateRpc(stats);
|
||||
return;
|
||||
name += "(Append)";
|
||||
break;
|
||||
case DELETE:
|
||||
deleteTracker.updateRpc(stats);
|
||||
return;
|
||||
name += "(Delete)";
|
||||
break;
|
||||
case INCREMENT:
|
||||
incrementTracker.updateRpc(stats);
|
||||
return;
|
||||
name += "(Increment)";
|
||||
break;
|
||||
case PUT:
|
||||
putTracker.updateRpc(stats);
|
||||
return;
|
||||
name += "(Put)";
|
||||
break;
|
||||
default:
|
||||
throw new RuntimeException("Unrecognized mutation type " + mutationType);
|
||||
}
|
||||
updateTableMetric(name, parseTableName(((MutateRequest) param).getRegion(), param), stats,
|
||||
e);
|
||||
return;
|
||||
case 2:
|
||||
assert "Scan".equals(method.getName());
|
||||
scanTracker.updateRpc(stats);
|
||||
updateTableMetric(name,
|
||||
parseTableName(((ClientProtos.ScanRequest) param).getRegion(), param), stats, e);
|
||||
return;
|
||||
case 3:
|
||||
assert "BulkLoadHFile".equals(method.getName());
|
||||
|
@ -723,6 +759,8 @@ public final class MetricsConnection implements StatisticTrackable {
|
|||
assert "Multi".equals(method.getName());
|
||||
numActionsPerServerHist.update(stats.getNumActionsPerServer());
|
||||
multiTracker.updateRpc(stats);
|
||||
updateTableMetric(name, parseTableName(
|
||||
((ClientProtos.MultiRequest) param).getRegionAction(0).getRegion(), param), stats, e);
|
||||
return;
|
||||
default:
|
||||
throw new RuntimeException("Unrecognized ClientService RPC type " + method.getFullName());
|
||||
|
@ -732,6 +770,32 @@ public final class MetricsConnection implements StatisticTrackable {
|
|||
updateRpcGeneric(methodName, stats);
|
||||
}
|
||||
|
||||
/** Report table rpc context to metrics system. */
|
||||
private void updateTableMetric(String methodName, String tableName, CallStats stats,
|
||||
Throwable e) {
|
||||
if (
|
||||
conf.getBoolean(CLIENT_SIDE_TABLE_METRICS_ENABLED_KEY, false) && methodName != null
|
||||
&& tableName != null
|
||||
) {
|
||||
String metricKey = methodName + "_" + tableName;
|
||||
updateRpcGeneric(metricKey, stats);
|
||||
if (e != null) {
|
||||
getMetric(FAILURE_CNT_BASE + metricKey, rpcCounters, counterFactory).inc();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/** Parse table from region. */
|
||||
private String parseTableName(HBaseProtos.RegionSpecifier regionSpecifier, Message param) {
|
||||
if (regionSpecifier == null || StringUtils.isEmpty(regionSpecifier.getValue().toStringUtf8())) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Region name is empty, request: {}", param);
|
||||
}
|
||||
return "unknown";
|
||||
}
|
||||
return regionSpecifier.getValue().toStringUtf8().split(",")[0];
|
||||
}
|
||||
|
||||
public void incrCacheDroppingExceptions(Object exception) {
|
||||
getMetric(
|
||||
CACHE_BASE + (exception == null ? UNKNOWN_EXCEPTION : exception.getClass().getSimpleName()),
|
||||
|
|
|
@ -346,10 +346,9 @@ public final class RequestConverter {
|
|||
public static ScanRequest buildScanRequest(byte[] regionName, Scan scan, int numberOfRows,
|
||||
boolean closeScanner) throws IOException {
|
||||
ScanRequest.Builder builder = ScanRequest.newBuilder();
|
||||
RegionSpecifier region = buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName);
|
||||
builder.setRegion(buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName));
|
||||
builder.setNumberOfRows(numberOfRows);
|
||||
builder.setCloseScanner(closeScanner);
|
||||
builder.setRegion(region);
|
||||
builder.setScan(ProtobufUtil.toScan(scan));
|
||||
builder.setClientHandlesPartials(true);
|
||||
builder.setClientHandlesHeartbeats(true);
|
||||
|
@ -364,9 +363,10 @@ public final class RequestConverter {
|
|||
* Create a protocol buffer ScanRequest for a scanner id
|
||||
* @return a scan request
|
||||
*/
|
||||
public static ScanRequest buildScanRequest(long scannerId, int numberOfRows, boolean closeScanner,
|
||||
boolean trackMetrics) {
|
||||
public static ScanRequest buildScanRequest(byte[] regionName, long scannerId, int numberOfRows,
|
||||
boolean closeScanner, boolean trackMetrics) {
|
||||
ScanRequest.Builder builder = ScanRequest.newBuilder();
|
||||
builder.setRegion(buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName));
|
||||
builder.setNumberOfRows(numberOfRows);
|
||||
builder.setCloseScanner(closeScanner);
|
||||
builder.setScannerId(scannerId);
|
||||
|
@ -380,9 +380,10 @@ public final class RequestConverter {
|
|||
* Create a protocol buffer ScanRequest for a scanner id
|
||||
* @return a scan request
|
||||
*/
|
||||
public static ScanRequest buildScanRequest(long scannerId, int numberOfRows, boolean closeScanner,
|
||||
long nextCallSeq, boolean trackMetrics, boolean renew, int limitOfRows) {
|
||||
public static ScanRequest buildScanRequest(byte[] regionName, long scannerId, int numberOfRows,
|
||||
boolean closeScanner, long nextCallSeq, boolean trackMetrics, boolean renew, int limitOfRows) {
|
||||
ScanRequest.Builder builder = ScanRequest.newBuilder();
|
||||
builder.setRegion(buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName));
|
||||
builder.setNumberOfRows(numberOfRows);
|
||||
builder.setCloseScanner(closeScanner);
|
||||
builder.setScannerId(scannerId);
|
||||
|
|
|
@ -23,8 +23,10 @@ import static org.junit.Assert.assertTrue;
|
|||
import com.codahale.metrics.Counter;
|
||||
import com.codahale.metrics.RatioGauge;
|
||||
import com.codahale.metrics.RatioGauge.Ratio;
|
||||
import com.codahale.metrics.Timer;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.Executors;
|
||||
|
@ -38,15 +40,20 @@ import org.apache.hadoop.hbase.testclassification.ClientTests;
|
|||
import org.apache.hadoop.hbase.testclassification.MetricsTests;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
import org.junit.runners.Parameterized.Parameter;
|
||||
import org.junit.runners.Parameterized.Parameters;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest;
|
||||
|
@ -56,25 +63,37 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanReques
|
|||
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
|
||||
|
||||
@RunWith(Parameterized.class)
|
||||
@Category({ ClientTests.class, MetricsTests.class, SmallTests.class })
|
||||
public class TestMetricsConnection {
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestMetricsConnection.class);
|
||||
|
||||
private static final Configuration conf = new Configuration();
|
||||
private static MetricsConnection METRICS;
|
||||
private static final ThreadPoolExecutor BATCH_POOL =
|
||||
(ThreadPoolExecutor) Executors.newFixedThreadPool(2);
|
||||
|
||||
private static final String MOCK_CONN_STR = "mocked-connection";
|
||||
|
||||
@BeforeClass
|
||||
public static void beforeClass() {
|
||||
METRICS = MetricsConnection.getMetricsConnection(MOCK_CONN_STR, () -> BATCH_POOL, () -> null);
|
||||
@Parameter()
|
||||
public boolean tableMetricsEnable;
|
||||
|
||||
@Parameters
|
||||
public static List<Boolean> params() {
|
||||
return Arrays.asList(false, true);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void afterClass() {
|
||||
@Before
|
||||
public void before() {
|
||||
conf.setBoolean(MetricsConnection.CLIENT_SIDE_TABLE_METRICS_ENABLED_KEY, tableMetricsEnable);
|
||||
METRICS =
|
||||
MetricsConnection.getMetricsConnection(conf, MOCK_CONN_STR, () -> BATCH_POOL, () -> null);
|
||||
}
|
||||
|
||||
@After
|
||||
public void after() {
|
||||
MetricsConnection.deleteMetricsConnection(MOCK_CONN_STR);
|
||||
}
|
||||
|
||||
|
@ -99,7 +118,7 @@ public class TestMetricsConnection {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testMetricsWithMutiConnections() throws IOException {
|
||||
public void testMetricsWithMultiConnections() throws IOException {
|
||||
Configuration conf = new Configuration();
|
||||
conf.setBoolean(MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY, true);
|
||||
conf.set(MetricsConnection.METRICS_SCOPE_KEY, "unit-test");
|
||||
|
@ -146,18 +165,28 @@ public class TestMetricsConnection {
|
|||
@Test
|
||||
public void testStaticMetrics() throws IOException {
|
||||
final byte[] foo = Bytes.toBytes("foo");
|
||||
final RegionSpecifier region = RegionSpecifier.newBuilder().setValue(ByteString.EMPTY)
|
||||
.setType(RegionSpecifierType.REGION_NAME).build();
|
||||
String table = "TableX";
|
||||
final RegionSpecifier region = RegionSpecifier.newBuilder()
|
||||
.setValue(ByteString.copyFromUtf8(table)).setType(RegionSpecifierType.REGION_NAME).build();
|
||||
final int loop = 5;
|
||||
|
||||
for (int i = 0; i < loop; i++) {
|
||||
METRICS.updateRpc(ClientService.getDescriptor().findMethodByName("Get"),
|
||||
GetRequest.getDefaultInstance(), MetricsConnection.newCallStats(), null);
|
||||
GetRequest.newBuilder().setRegion(region).setGet(ProtobufUtil.toGet(new Get(foo))).build(),
|
||||
MetricsConnection.newCallStats(), null);
|
||||
METRICS.updateRpc(ClientService.getDescriptor().findMethodByName("Scan"),
|
||||
ScanRequest.getDefaultInstance(), MetricsConnection.newCallStats(),
|
||||
ScanRequest.newBuilder().setRegion(region)
|
||||
.setScan(ProtobufUtil.toScan(new Scan(new Get(foo)))).build(),
|
||||
MetricsConnection.newCallStats(),
|
||||
new RemoteWithExtrasException("java.io.IOException", null, false, false));
|
||||
METRICS.updateRpc(ClientService.getDescriptor().findMethodByName("Multi"),
|
||||
MultiRequest.getDefaultInstance(), MetricsConnection.newCallStats(),
|
||||
MultiRequest.newBuilder()
|
||||
.addRegionAction(ClientProtos.RegionAction.newBuilder()
|
||||
.addAction(
|
||||
ClientProtos.Action.newBuilder().setGet(ProtobufUtil.toGet(new Get(foo))).build())
|
||||
.setRegion(region).build())
|
||||
.build(),
|
||||
MetricsConnection.newCallStats(),
|
||||
new CallTimeoutException("test with CallTimeoutException"));
|
||||
METRICS.updateRpc(ClientService.getDescriptor().findMethodByName("Mutate"),
|
||||
MutateRequest.newBuilder()
|
||||
|
@ -193,7 +222,9 @@ public class TestMetricsConnection {
|
|||
metricVal = METRICS.getRpcCounters().get(metricKey).getCount();
|
||||
assertTrue("metric: " + metricKey + " val: " + metricVal, metricVal >= loop);
|
||||
|
||||
metricKey = rpcFailureCountPrefix + method;
|
||||
metricKey = tableMetricsEnable
|
||||
? rpcFailureCountPrefix + method + "_" + table
|
||||
: rpcFailureCountPrefix + method;
|
||||
counter = METRICS.getRpcCounters().get(metricKey);
|
||||
metricVal = (counter != null) ? counter.getCount() : 0;
|
||||
if (method.equals("Get") || method.equals("Mutate")) {
|
||||
|
@ -223,6 +254,45 @@ public class TestMetricsConnection {
|
|||
metricVal = (counter != null) ? counter.getCount() : 0;
|
||||
assertTrue("metric: " + metricKey + " val: " + metricVal, metricVal == loop * 2);
|
||||
|
||||
Timer timer;
|
||||
String numOpsSuffix = "_num_ops";
|
||||
String p95Suffix = "_95th_percentile";
|
||||
String p99Suffix = "_99th_percentile";
|
||||
String service = ClientService.getDescriptor().getName();
|
||||
for (String method : new String[] { "Get", "Scan", "Multi" }) {
|
||||
metricKey = "rpcCallDurationMs_" + service + "_" + method + "_" + table;
|
||||
timer = METRICS.getRpcTimers().get(metricKey);
|
||||
if (tableMetricsEnable) {
|
||||
assert timer != null;
|
||||
long numOps = timer.getCount();
|
||||
double p95 = timer.getSnapshot().get95thPercentile();
|
||||
double p99 = timer.getSnapshot().get99thPercentile();
|
||||
assertTrue("metric: " + metricKey + numOpsSuffix + " val: " + numOps, numOps == loop);
|
||||
assertTrue("metric: " + metricKey + p95Suffix + " val: " + p95, p95 >= 0);
|
||||
assertTrue("metric: " + metricKey + p99Suffix + " val: " + p99, p99 >= 0);
|
||||
} else {
|
||||
assert timer == null;
|
||||
}
|
||||
}
|
||||
|
||||
String method = "Mutate";
|
||||
for (String mutationType : new String[] { "Append", "Delete", "Increment", "Put" }) {
|
||||
metricKey =
|
||||
"rpcCallDurationMs_" + service + "_" + method + "(" + mutationType + ")" + "_" + table;
|
||||
timer = METRICS.getRpcTimers().get(metricKey);
|
||||
if (tableMetricsEnable) {
|
||||
assert timer != null;
|
||||
long numOps = timer.getCount();
|
||||
double p95 = timer.getSnapshot().get95thPercentile();
|
||||
double p99 = timer.getSnapshot().get99thPercentile();
|
||||
assertTrue("metric: " + metricKey + numOpsSuffix + " val: " + numOps, numOps == loop);
|
||||
assertTrue("metric: " + metricKey + p95Suffix + " val: " + p95, p95 >= 0);
|
||||
assertTrue("metric: " + metricKey + p99Suffix + " val: " + p99, p99 >= 0);
|
||||
} else {
|
||||
assert timer == null;
|
||||
}
|
||||
}
|
||||
|
||||
for (MetricsConnection.CallTracker t : new MetricsConnection.CallTracker[] {
|
||||
METRICS.getGetTracker(), METRICS.getScanTracker(), METRICS.getMultiTracker(),
|
||||
METRICS.getAppendTracker(), METRICS.getDeleteTracker(), METRICS.getIncrementTracker(),
|
||||
|
|
|
@ -105,18 +105,20 @@ public class TestAlwaysSetScannerId {
|
|||
int nextCallSeq = 0;
|
||||
// test next
|
||||
for (int i = 0; i < COUNT / 2; i++) {
|
||||
req = RequestConverter.buildScanRequest(scannerId, 1, false, nextCallSeq++, false, false, -1);
|
||||
req = RequestConverter.buildScanRequest(HRI.getRegionName(), scannerId, 1, false,
|
||||
nextCallSeq++, false, false, -1);
|
||||
resp = scan(req);
|
||||
assertTrue(resp.hasScannerId());
|
||||
assertEquals(scannerId, resp.getScannerId());
|
||||
}
|
||||
// test renew
|
||||
req = RequestConverter.buildScanRequest(scannerId, 0, false, nextCallSeq++, false, true, -1);
|
||||
req = RequestConverter.buildScanRequest(HRI.getRegionName(), scannerId, 0, false, nextCallSeq++,
|
||||
false, true, -1);
|
||||
resp = scan(req);
|
||||
assertTrue(resp.hasScannerId());
|
||||
assertEquals(scannerId, resp.getScannerId());
|
||||
// test close
|
||||
req = RequestConverter.buildScanRequest(scannerId, 0, true, false);
|
||||
req = RequestConverter.buildScanRequest(HRI.getRegionName(), scannerId, 0, true, false);
|
||||
resp = scan(req);
|
||||
assertTrue(resp.hasScannerId());
|
||||
assertEquals(scannerId, resp.getScannerId());
|
||||
|
|
|
@ -114,7 +114,8 @@ public class TestScanWithoutFetchingData {
|
|||
int nextCallSeq = 0;
|
||||
// test normal next
|
||||
for (int i = 0; i < COUNT / 2; i++) {
|
||||
req = RequestConverter.buildScanRequest(scannerId, 1, false, nextCallSeq++, false, false, -1);
|
||||
req = RequestConverter.buildScanRequest(HRI.getRegionName(), scannerId, 1, false,
|
||||
nextCallSeq++, false, false, -1);
|
||||
hrc.reset();
|
||||
resp = scan(hrc, req);
|
||||
assertTrue(resp.getMoreResults());
|
||||
|
@ -124,14 +125,16 @@ public class TestScanWithoutFetchingData {
|
|||
assertResult(i, results[0]);
|
||||
}
|
||||
// test zero next
|
||||
req = RequestConverter.buildScanRequest(scannerId, 0, false, nextCallSeq++, false, false, -1);
|
||||
req = RequestConverter.buildScanRequest(HRI.getRegionName(), scannerId, 0, false, nextCallSeq++,
|
||||
false, false, -1);
|
||||
hrc.reset();
|
||||
resp = scan(hrc, req);
|
||||
assertTrue(resp.getMoreResults());
|
||||
assertTrue(resp.getMoreResultsInRegion());
|
||||
assertEquals(0, ResponseConverter.getResults(hrc.cellScanner(), resp).length);
|
||||
for (int i = COUNT / 2; i < COUNT; i++) {
|
||||
req = RequestConverter.buildScanRequest(scannerId, 1, false, nextCallSeq++, false, false, -1);
|
||||
req = RequestConverter.buildScanRequest(HRI.getRegionName(), scannerId, 1, false,
|
||||
nextCallSeq++, false, false, -1);
|
||||
hrc.reset();
|
||||
resp = scan(hrc, req);
|
||||
assertTrue(resp.getMoreResults());
|
||||
|
@ -141,7 +144,7 @@ public class TestScanWithoutFetchingData {
|
|||
assertResult(i, results[0]);
|
||||
}
|
||||
// close
|
||||
req = RequestConverter.buildScanRequest(scannerId, 0, true, false);
|
||||
req = RequestConverter.buildScanRequest(HRI.getRegionName(), scannerId, 0, true, false);
|
||||
hrc.reset();
|
||||
resp = scan(hrc, req);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue