HBASE-21720 : metric to measure how actions are distributed to servers within a MultiAction
Signed-off-by: Sergey Shelukhin <sershe@apache.org>
This commit is contained in:
parent
050caf425e
commit
d187af00ca
|
@ -74,6 +74,7 @@ public class MetricsConnection implements StatisticTrackable {
|
|||
private long startTime = 0;
|
||||
private long callTimeMs = 0;
|
||||
private int concurrentCallsPerServer = 0;
|
||||
private int numActionsPerServer = 0;
|
||||
|
||||
public long getRequestSizeBytes() {
|
||||
return requestSizeBytes;
|
||||
|
@ -114,6 +115,14 @@ public class MetricsConnection implements StatisticTrackable {
|
|||
public void setConcurrentCallsPerServer(int callsPerServer) {
|
||||
this.concurrentCallsPerServer = callsPerServer;
|
||||
}
|
||||
|
||||
public int getNumActionsPerServer() {
|
||||
return numActionsPerServer;
|
||||
}
|
||||
|
||||
public void setNumActionsPerServer(int numActionsPerServer) {
|
||||
this.numActionsPerServer = numActionsPerServer;
|
||||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
|
@ -281,6 +290,7 @@ public class MetricsConnection implements StatisticTrackable {
|
|||
@VisibleForTesting protected final Counter hedgedReadOps;
|
||||
@VisibleForTesting protected final Counter hedgedReadWin;
|
||||
@VisibleForTesting protected final Histogram concurrentCallsPerServerHist;
|
||||
@VisibleForTesting protected final Histogram numActionsPerServerHist;
|
||||
|
||||
// dynamic metrics
|
||||
|
||||
|
@ -337,8 +347,10 @@ public class MetricsConnection implements StatisticTrackable {
|
|||
this.putTracker = new CallTracker(this.registry, "Mutate", "Put", scope);
|
||||
this.multiTracker = new CallTracker(this.registry, "Multi", scope);
|
||||
this.runnerStats = new RunnerStats(this.registry);
|
||||
this.concurrentCallsPerServerHist = registry.histogram(name(MetricsConnection.class,
|
||||
this.concurrentCallsPerServerHist = registry.histogram(name(MetricsConnection.class,
|
||||
"concurrentCallsPerServer", scope));
|
||||
this.numActionsPerServerHist = registry.histogram(name(MetricsConnection.class,
|
||||
"numActionsPerServer", scope));
|
||||
|
||||
this.reporter = JmxReporter.forRegistry(this.registry).build();
|
||||
this.reporter.start();
|
||||
|
@ -442,59 +454,60 @@ public class MetricsConnection implements StatisticTrackable {
|
|||
// if we could dispatch based on something static, ie, request Message type.
|
||||
if (method.getService() == ClientService.getDescriptor()) {
|
||||
switch(method.getIndex()) {
|
||||
case 0:
|
||||
assert "Get".equals(method.getName());
|
||||
getTracker.updateRpc(stats);
|
||||
return;
|
||||
case 1:
|
||||
assert "Mutate".equals(method.getName());
|
||||
final MutationType mutationType = ((MutateRequest) param).getMutation().getMutateType();
|
||||
switch(mutationType) {
|
||||
case APPEND:
|
||||
appendTracker.updateRpc(stats);
|
||||
case 0:
|
||||
assert "Get".equals(method.getName());
|
||||
getTracker.updateRpc(stats);
|
||||
return;
|
||||
case DELETE:
|
||||
deleteTracker.updateRpc(stats);
|
||||
case 1:
|
||||
assert "Mutate".equals(method.getName());
|
||||
final MutationType mutationType = ((MutateRequest) param).getMutation().getMutateType();
|
||||
switch(mutationType) {
|
||||
case APPEND:
|
||||
appendTracker.updateRpc(stats);
|
||||
return;
|
||||
case DELETE:
|
||||
deleteTracker.updateRpc(stats);
|
||||
return;
|
||||
case INCREMENT:
|
||||
incrementTracker.updateRpc(stats);
|
||||
return;
|
||||
case PUT:
|
||||
putTracker.updateRpc(stats);
|
||||
return;
|
||||
default:
|
||||
throw new RuntimeException("Unrecognized mutation type " + mutationType);
|
||||
}
|
||||
case 2:
|
||||
assert "Scan".equals(method.getName());
|
||||
scanTracker.updateRpc(stats);
|
||||
return;
|
||||
case INCREMENT:
|
||||
incrementTracker.updateRpc(stats);
|
||||
return;
|
||||
case PUT:
|
||||
putTracker.updateRpc(stats);
|
||||
case 3:
|
||||
assert "BulkLoadHFile".equals(method.getName());
|
||||
// use generic implementation
|
||||
break;
|
||||
case 4:
|
||||
assert "PrepareBulkLoad".equals(method.getName());
|
||||
// use generic implementation
|
||||
break;
|
||||
case 5:
|
||||
assert "CleanupBulkLoad".equals(method.getName());
|
||||
// use generic implementation
|
||||
break;
|
||||
case 6:
|
||||
assert "ExecService".equals(method.getName());
|
||||
// use generic implementation
|
||||
break;
|
||||
case 7:
|
||||
assert "ExecRegionServerService".equals(method.getName());
|
||||
// use generic implementation
|
||||
break;
|
||||
case 8:
|
||||
assert "Multi".equals(method.getName());
|
||||
numActionsPerServerHist.update(stats.getNumActionsPerServer());
|
||||
multiTracker.updateRpc(stats);
|
||||
return;
|
||||
default:
|
||||
throw new RuntimeException("Unrecognized mutation type " + mutationType);
|
||||
}
|
||||
case 2:
|
||||
assert "Scan".equals(method.getName());
|
||||
scanTracker.updateRpc(stats);
|
||||
return;
|
||||
case 3:
|
||||
assert "BulkLoadHFile".equals(method.getName());
|
||||
// use generic implementation
|
||||
break;
|
||||
case 4:
|
||||
assert "PrepareBulkLoad".equals(method.getName());
|
||||
// use generic implementation
|
||||
break;
|
||||
case 5:
|
||||
assert "CleanupBulkLoad".equals(method.getName());
|
||||
// use generic implementation
|
||||
break;
|
||||
case 6:
|
||||
assert "ExecService".equals(method.getName());
|
||||
// use generic implementation
|
||||
break;
|
||||
case 7:
|
||||
assert "ExecRegionServerService".equals(method.getName());
|
||||
// use generic implementation
|
||||
break;
|
||||
case 8:
|
||||
assert "Multi".equals(method.getName());
|
||||
multiTracker.updateRpc(stats);
|
||||
return;
|
||||
default:
|
||||
throw new RuntimeException("Unrecognized ClientService RPC type " + method.getFullName());
|
||||
throw new RuntimeException("Unrecognized ClientService RPC type " + method.getFullName());
|
||||
}
|
||||
}
|
||||
// Fallback to dynamic registry lookup for DDL methods.
|
||||
|
|
|
@ -70,6 +70,8 @@ import org.apache.hadoop.ipc.RemoteException;
|
|||
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||
import org.apache.hadoop.security.token.TokenSelector;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
|
||||
|
||||
/**
|
||||
* Provides the basics for a RpcClient implementation like configuration and Logging.
|
||||
* <p>
|
||||
|
@ -401,6 +403,17 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC
|
|||
final RpcCallback<Message> callback) {
|
||||
final MetricsConnection.CallStats cs = MetricsConnection.newCallStats();
|
||||
cs.setStartTime(EnvironmentEdgeManager.currentTime());
|
||||
|
||||
if (param instanceof ClientProtos.MultiRequest) {
|
||||
ClientProtos.MultiRequest req = (ClientProtos.MultiRequest) param;
|
||||
int numActions = 0;
|
||||
for (ClientProtos.RegionAction regionAction : req.getRegionActionList()) {
|
||||
numActions += regionAction.getActionCount();
|
||||
}
|
||||
|
||||
cs.setNumActionsPerServer(numActions);
|
||||
}
|
||||
|
||||
final AtomicInteger counter = concurrentCounterCache.getUnchecked(addr);
|
||||
Call call = new Call(nextCallId(), md, param, hrc.cellScanner(), returnType,
|
||||
hrc.getCallTimeout(), hrc.getPriority(), new RpcCallback<Call>() {
|
||||
|
|
|
@ -0,0 +1,88 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
@Category({ SmallTests.class, ClientTests.class })
|
||||
public class TestMultiActionMetricsFromClient {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestMultiActionMetricsFromClient.class);
|
||||
|
||||
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||
private static final TableName TABLE_NAME = TableName.valueOf("test_table");
|
||||
private static final byte[] FAMILY = Bytes.toBytes("fam1");
|
||||
private static final byte[] QUALIFIER = Bytes.toBytes("qual");
|
||||
|
||||
@BeforeClass
|
||||
public static void setUpBeforeClass() throws Exception {
|
||||
TEST_UTIL.startMiniCluster(1);
|
||||
TEST_UTIL.getHBaseCluster().waitForActiveAndReadyMaster();
|
||||
TEST_UTIL.waitUntilAllRegionsAssigned(TABLE_NAME.META_TABLE_NAME);
|
||||
TEST_UTIL.createTable(TABLE_NAME, FAMILY);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDownAfterClass() throws Exception {
|
||||
TEST_UTIL.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMultiMetrics() throws Exception {
|
||||
Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
|
||||
conf.set(MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY, "true");
|
||||
ConnectionImplementation conn =
|
||||
(ConnectionImplementation) ConnectionFactory.createConnection(conf);
|
||||
|
||||
try {
|
||||
BufferedMutator mutator = conn.getBufferedMutator(TABLE_NAME);
|
||||
byte[][] keys = {Bytes.toBytes("aaa"), Bytes.toBytes("mmm"), Bytes.toBytes("zzz")};
|
||||
for (byte[] key : keys) {
|
||||
Put p = new Put(key);
|
||||
p.addColumn(FAMILY, QUALIFIER, Bytes.toBytes(10));
|
||||
mutator.mutate(p);
|
||||
}
|
||||
|
||||
mutator.flush();
|
||||
mutator.close();
|
||||
|
||||
MetricsConnection metrics = conn.getConnectionMetrics();
|
||||
assertEquals(1, metrics.multiTracker.reqHist.getCount());
|
||||
assertEquals(3, metrics.numActionsPerServerHist.getSnapshot().getMean(), 1e-15);
|
||||
assertEquals(1, metrics.numActionsPerServerHist.getCount());
|
||||
} finally {
|
||||
conn.close();
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue