HBASE-14693 Add client-side metrics for received pushback signals

Signed-off-by: Andrew Purtell <apurtell@apache.org>
This commit is contained in:
chenheng 2015-11-05 16:12:41 +08:00 committed by Andrew Purtell
parent 44367f55e8
commit 086bacd12d
3 changed files with 141 additions and 2 deletions

View File

@ -1009,6 +1009,9 @@ class AsyncProcess {
int numAttempt) {
// no stats to manage, just do the standard action
if (AsyncProcess.this.connection.getStatisticsTracker() == null) {
if (connection.getConnectionMetrics() != null) {
connection.getConnectionMetrics().incrNormalRunners();
}
return Collections.singletonList(Trace.wrap("AsyncProcess.sendMultiAction",
new SingleServerRequestRunnable(multiAction, numAttempt, server, callsInProgress)));
}
@ -1039,6 +1042,14 @@ class AsyncProcess {
runner.setRunner(runnable);
traceText = "AsyncProcess.clientBackoff.sendMultiAction";
runnable = runner;
if (connection.getConnectionMetrics() != null) {
connection.getConnectionMetrics().incrDelayRunners();
connection.getConnectionMetrics().updateDelayInterval(runner.getSleepTime());
}
} else {
if (connection.getConnectionMetrics() != null) {
connection.getConnectionMetrics().incrNormalRunners();
}
}
runnable = Trace.wrap(traceText, runnable);
toReturn.add(runnable);
@ -1267,6 +1278,12 @@ class AsyncProcess {
++failed;
}
} else {
if (AsyncProcess.this.connection.getConnectionMetrics() != null) {
AsyncProcess.this.connection.getConnectionMetrics().
updateServerStats(server, regionName, result);
}
// update the stats about the region, if its a user table. We don't want to slow down
// updates to meta tables, especially from internal updates (master, etc).
if (AsyncProcess.this.connection.getStatisticsTracker() != null) {

View File

@ -26,12 +26,16 @@ import com.yammer.metrics.core.MetricsRegistry;
import com.yammer.metrics.core.Timer;
import com.yammer.metrics.reporting.JmxReporter;
import com.yammer.metrics.util.RatioGauge;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
import org.apache.hadoop.hbase.util.Bytes;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@ -54,6 +58,8 @@ public class MetricsConnection {
private static final String DRTN_BASE = "rpcCallDurationMs_";
private static final String REQ_BASE = "rpcCallRequestSizeBytes_";
private static final String RESP_BASE = "rpcCallResponseSizeBytes_";
private static final String MEMLOAD_BASE = "memstoreLoad_";
private static final String HEAP_BASE = "heapOccupancy_";
private static final String CLIENT_SVC = ClientService.getDescriptor().getName();
/** A container class for collecting details about the RPC call as it percolates. */
@ -130,6 +136,88 @@ public class MetricsConnection {
}
}
protected static class RegionStats {
final String name;
final Histogram memstoreLoadHist;
final Histogram heapOccupancyHist;
public RegionStats(MetricsRegistry registry, String name) {
this.name = name;
this.memstoreLoadHist = registry.newHistogram(MetricsConnection.class,
MEMLOAD_BASE + this.name);
this.heapOccupancyHist = registry.newHistogram(MetricsConnection.class,
HEAP_BASE + this.name);
}
public void update(ClientProtos.RegionLoadStats regionStatistics) {
this.memstoreLoadHist.update(regionStatistics.getMemstoreLoad());
this.heapOccupancyHist.update(regionStatistics.getHeapOccupancy());
}
}
@VisibleForTesting
protected static class RunnerStats {
final Counter normalRunners;
final Counter delayRunners;
final Histogram delayIntevalHist;
public RunnerStats(MetricsRegistry registry) {
this.normalRunners = registry.newCounter(MetricsConnection.class, "normalRunnersCount");
this.delayRunners = registry.newCounter(MetricsConnection.class, "delayRunnersCount");
this.delayIntevalHist = registry.newHistogram(MetricsConnection.class, "delayIntervalHist");
}
public void incrNormalRunners() {
this.normalRunners.inc();
}
public void incrDelayRunners() {
this.delayRunners.inc();
}
public void updateDelayInterval(long interval) {
this.delayIntevalHist.update(interval);
}
}
@VisibleForTesting
protected ConcurrentHashMap<ServerName, ConcurrentMap<byte[], RegionStats>> serverStats
= new ConcurrentHashMap<ServerName, ConcurrentMap<byte[], RegionStats>>();
public void updateServerStats(ServerName serverName, byte[] regionName,
Object r) {
if (!(r instanceof Result)) {
return;
}
Result result = (Result) r;
ClientProtos.RegionLoadStats stats = result.getStats();
if(stats == null){
return;
}
String name = serverName.getServerName() + "," + Bytes.toStringBinary(regionName);
ConcurrentMap<byte[], RegionStats> rsStats = null;
if (serverStats.containsKey(serverName)) {
rsStats = serverStats.get(serverName);
} else {
rsStats = serverStats.putIfAbsent(serverName,
new ConcurrentSkipListMap<byte[], RegionStats>(Bytes.BYTES_COMPARATOR));
if (rsStats == null) {
rsStats = serverStats.get(serverName);
}
}
RegionStats regionStats = null;
if (rsStats.containsKey(regionName)) {
regionStats = rsStats.get(regionName);
} else {
regionStats = rsStats.putIfAbsent(regionName, new RegionStats(this.registry, name));
if (regionStats == null) {
regionStats = rsStats.get(regionName);
}
}
regionStats.update(stats);
}
/** A lambda for dispatching to the appropriate metric factory method */
private static interface NewMetric<T> {
T newMetric(Class<?> clazz, String name, String scope);
@ -172,6 +260,7 @@ public class MetricsConnection {
@VisibleForTesting protected final CallTracker incrementTracker;
@VisibleForTesting protected final CallTracker putTracker;
@VisibleForTesting protected final CallTracker multiTracker;
@VisibleForTesting protected final RunnerStats runnerStats;
// dynamic metrics
@ -217,6 +306,8 @@ public class MetricsConnection {
this.incrementTracker = new CallTracker(this.registry, "Mutate", "Increment", scope);
this.putTracker = new CallTracker(this.registry, "Mutate", "Put", scope);
this.multiTracker = new CallTracker(this.registry, "Multi", scope);
this.runnerStats = new RunnerStats(this.registry);
this.reporter = new JmxReporter(this.registry);
this.reporter.start();
}
@ -242,6 +333,21 @@ public class MetricsConnection {
metaCacheMisses.inc();
}
/** Increment the number of normal runner counts. */
public void incrNormalRunners() {
this.runnerStats.incrNormalRunners();
}
/** Increment the number of delay runner counts. */
public void incrDelayRunners() {
this.runnerStats.incrDelayRunners();
}
/** Update delay interval of delay runner. */
public void updateDelayInterval(long interval) {
this.runnerStats.updateDelayInterval(interval);
}
/**
* Get a metric for {@code key} from {@code map}, or create it with {@code factory}.
*/

View File

@ -42,6 +42,7 @@ import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import static org.apache.hadoop.hbase.client.MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
@ -74,7 +75,7 @@ public class TestClientPushback {
conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, flushSizeBytes);
// ensure we block the flushes when we are double that flushsize
conf.setLong(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER, HConstants.DEFAULT_HREGION_MEMSTORE_BLOCK_MULTIPLIER);
conf.setBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, true);
UTIL.startMiniCluster(1);
UTIL.createTable(tableName, family);
}
@ -87,6 +88,7 @@ public class TestClientPushback {
@Test(timeout=60000)
public void testClientTracksServerPushback() throws Exception{
Configuration conf = UTIL.getConfiguration();
ClusterConnection conn = (ClusterConnection) ConnectionFactory.createConnection(conf);
HTable table = (HTable) conn.getTable(tableName);
@ -119,7 +121,6 @@ public class TestClientPushback {
ServerStatistics.RegionStatistics regionStats = serverStats.getStatsForRegion(regionName);
assertEquals("We did not find some load on the memstore", load,
regionStats.getMemstoreLoadPercent());
// check that the load reported produces a nonzero delay
long backoffTime = backoffPolicy.getBackoffTime(server, regionName, serverStats);
assertNotEquals("Reported load does not produce a backoff", backoffTime, 0);
@ -145,6 +146,21 @@ public class TestClientPushback {
// produces a backoffTime of 151 milliseconds. This is long enough so the
// wait and related checks below are reasonable. Revisit if the backoff
// time reported by above debug logging has significantly deviated.
String name = server.getServerName() + "," + Bytes.toStringBinary(regionName);
MetricsConnection.RegionStats rsStats = conn.getConnectionMetrics().
serverStats.get(server).get(regionName);
assertEquals(name, rsStats.name);
assertEquals(rsStats.heapOccupancyHist.mean(),
(double)regionStats.getHeapOccupancyPercent(), 0.1 );
assertEquals(rsStats.memstoreLoadHist.mean(),
(double)regionStats.getMemstoreLoadPercent(), 0.1);
MetricsConnection.RunnerStats runnerStats = conn.getConnectionMetrics().runnerStats;
assertEquals(runnerStats.delayRunners.count(), 1);
assertEquals(runnerStats.normalRunners.count(), 1);
assertEquals("", runnerStats.delayIntevalHist.mean(), (double)backoffTime, 0.1);
latch.await(backoffTime * 2, TimeUnit.MILLISECONDS);
assertNotEquals("AsyncProcess did not submit the work time", endTime.get(), 0);
assertTrue("AsyncProcess did not delay long enough", endTime.get() - startTime >= backoffTime);