HBASE-14693 Add client-side metrics for received pushback signals
Signed-off-by: Andrew Purtell <apurtell@apache.org> Conflicts: hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java
This commit is contained in:
parent
fd754631aa
commit
7330e11aa4
|
@ -1007,6 +1007,9 @@ class AsyncProcess {
|
|||
int numAttempt) {
|
||||
// no stats to manage, just do the standard action
|
||||
if (AsyncProcess.this.connection.getStatisticsTracker() == null) {
|
||||
if (connection.getConnectionMetrics() != null) {
|
||||
connection.getConnectionMetrics().incrNormalRunners();
|
||||
}
|
||||
return Collections.singletonList(Trace.wrap("AsyncProcess.sendMultiAction",
|
||||
new SingleServerRequestRunnable(multiAction, numAttempt, server, callsInProgress)));
|
||||
}
|
||||
|
@ -1037,6 +1040,14 @@ class AsyncProcess {
|
|||
runner.setRunner(runnable);
|
||||
traceText = "AsyncProcess.clientBackoff.sendMultiAction";
|
||||
runnable = runner;
|
||||
if (connection.getConnectionMetrics() != null) {
|
||||
connection.getConnectionMetrics().incrDelayRunners();
|
||||
connection.getConnectionMetrics().updateDelayInterval(runner.getSleepTime());
|
||||
}
|
||||
} else {
|
||||
if (connection.getConnectionMetrics() != null) {
|
||||
connection.getConnectionMetrics().incrNormalRunners();
|
||||
}
|
||||
}
|
||||
runnable = Trace.wrap(traceText, runnable);
|
||||
toReturn.add(runnable);
|
||||
|
@ -1266,6 +1277,12 @@ class AsyncProcess {
|
|||
++failed;
|
||||
}
|
||||
} else {
|
||||
|
||||
if (AsyncProcess.this.connection.getConnectionMetrics() != null) {
|
||||
AsyncProcess.this.connection.getConnectionMetrics().
|
||||
updateServerStats(server, regionName, result);
|
||||
}
|
||||
|
||||
// update the stats about the region, if its a user table. We don't want to slow down
|
||||
// updates to meta tables, especially from internal updates (master, etc).
|
||||
if (AsyncProcess.this.connection.getStatisticsTracker() != null) {
|
||||
|
|
|
@ -26,12 +26,16 @@ import com.yammer.metrics.core.MetricsRegistry;
|
|||
import com.yammer.metrics.core.Timer;
|
||||
import com.yammer.metrics.reporting.JmxReporter;
|
||||
import com.yammer.metrics.util.RatioGauge;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentSkipListMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
@ -54,6 +58,8 @@ public class MetricsConnection {
|
|||
private static final String DRTN_BASE = "rpcCallDurationMs_";
|
||||
private static final String REQ_BASE = "rpcCallRequestSizeBytes_";
|
||||
private static final String RESP_BASE = "rpcCallResponseSizeBytes_";
|
||||
private static final String MEMLOAD_BASE = "memstoreLoad_";
|
||||
private static final String HEAP_BASE = "heapOccupancy_";
|
||||
private static final String CLIENT_SVC = ClientService.getDescriptor().getName();
|
||||
|
||||
/** A container class for collecting details about the RPC call as it percolates. */
|
||||
|
@ -130,6 +136,88 @@ public class MetricsConnection {
|
|||
}
|
||||
}
|
||||
|
||||
protected static class RegionStats {
|
||||
final String name;
|
||||
final Histogram memstoreLoadHist;
|
||||
final Histogram heapOccupancyHist;
|
||||
|
||||
public RegionStats(MetricsRegistry registry, String name) {
|
||||
this.name = name;
|
||||
this.memstoreLoadHist = registry.newHistogram(MetricsConnection.class,
|
||||
MEMLOAD_BASE + this.name);
|
||||
this.heapOccupancyHist = registry.newHistogram(MetricsConnection.class,
|
||||
HEAP_BASE + this.name);
|
||||
}
|
||||
|
||||
public void update(ClientProtos.RegionLoadStats regionStatistics) {
|
||||
this.memstoreLoadHist.update(regionStatistics.getMemstoreLoad());
|
||||
this.heapOccupancyHist.update(regionStatistics.getHeapOccupancy());
|
||||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
protected static class RunnerStats {
|
||||
final Counter normalRunners;
|
||||
final Counter delayRunners;
|
||||
final Histogram delayIntevalHist;
|
||||
|
||||
public RunnerStats(MetricsRegistry registry) {
|
||||
this.normalRunners = registry.newCounter(MetricsConnection.class, "normalRunnersCount");
|
||||
this.delayRunners = registry.newCounter(MetricsConnection.class, "delayRunnersCount");
|
||||
this.delayIntevalHist = registry.newHistogram(MetricsConnection.class, "delayIntervalHist");
|
||||
}
|
||||
|
||||
public void incrNormalRunners() {
|
||||
this.normalRunners.inc();
|
||||
}
|
||||
|
||||
public void incrDelayRunners() {
|
||||
this.delayRunners.inc();
|
||||
}
|
||||
|
||||
public void updateDelayInterval(long interval) {
|
||||
this.delayIntevalHist.update(interval);
|
||||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
protected ConcurrentHashMap<ServerName, ConcurrentMap<byte[], RegionStats>> serverStats
|
||||
= new ConcurrentHashMap<ServerName, ConcurrentMap<byte[], RegionStats>>();
|
||||
|
||||
public void updateServerStats(ServerName serverName, byte[] regionName,
|
||||
Object r) {
|
||||
if (!(r instanceof Result)) {
|
||||
return;
|
||||
}
|
||||
Result result = (Result) r;
|
||||
ClientProtos.RegionLoadStats stats = result.getStats();
|
||||
if(stats == null){
|
||||
return;
|
||||
}
|
||||
String name = serverName.getServerName() + "," + Bytes.toStringBinary(regionName);
|
||||
ConcurrentMap<byte[], RegionStats> rsStats = null;
|
||||
if (serverStats.containsKey(serverName)) {
|
||||
rsStats = serverStats.get(serverName);
|
||||
} else {
|
||||
rsStats = serverStats.putIfAbsent(serverName,
|
||||
new ConcurrentSkipListMap<byte[], RegionStats>(Bytes.BYTES_COMPARATOR));
|
||||
if (rsStats == null) {
|
||||
rsStats = serverStats.get(serverName);
|
||||
}
|
||||
}
|
||||
RegionStats regionStats = null;
|
||||
if (rsStats.containsKey(regionName)) {
|
||||
regionStats = rsStats.get(regionName);
|
||||
} else {
|
||||
regionStats = rsStats.putIfAbsent(regionName, new RegionStats(this.registry, name));
|
||||
if (regionStats == null) {
|
||||
regionStats = rsStats.get(regionName);
|
||||
}
|
||||
}
|
||||
regionStats.update(stats);
|
||||
}
|
||||
|
||||
|
||||
/** A lambda for dispatching to the appropriate metric factory method */
|
||||
private static interface NewMetric<T> {
|
||||
T newMetric(Class<?> clazz, String name, String scope);
|
||||
|
@ -172,6 +260,7 @@ public class MetricsConnection {
|
|||
@VisibleForTesting protected final CallTracker incrementTracker;
|
||||
@VisibleForTesting protected final CallTracker putTracker;
|
||||
@VisibleForTesting protected final CallTracker multiTracker;
|
||||
@VisibleForTesting protected final RunnerStats runnerStats;
|
||||
|
||||
// dynamic metrics
|
||||
|
||||
|
@ -217,6 +306,8 @@ public class MetricsConnection {
|
|||
this.incrementTracker = new CallTracker(this.registry, "Mutate", "Increment", scope);
|
||||
this.putTracker = new CallTracker(this.registry, "Mutate", "Put", scope);
|
||||
this.multiTracker = new CallTracker(this.registry, "Multi", scope);
|
||||
this.runnerStats = new RunnerStats(this.registry);
|
||||
|
||||
this.reporter = new JmxReporter(this.registry);
|
||||
this.reporter.start();
|
||||
}
|
||||
|
@ -242,6 +333,21 @@ public class MetricsConnection {
|
|||
metaCacheMisses.inc();
|
||||
}
|
||||
|
||||
/** Increment the number of normal runner counts. */
|
||||
public void incrNormalRunners() {
|
||||
this.runnerStats.incrNormalRunners();
|
||||
}
|
||||
|
||||
/** Increment the number of delay runner counts. */
|
||||
public void incrDelayRunners() {
|
||||
this.runnerStats.incrDelayRunners();
|
||||
}
|
||||
|
||||
/** Update delay interval of delay runner. */
|
||||
public void updateDelayInterval(long interval) {
|
||||
this.runnerStats.updateDelayInterval(interval);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a metric for {@code key} from {@code map}, or create it with {@code factory}.
|
||||
*/
|
||||
|
|
|
@ -43,6 +43,7 @@ import org.junit.BeforeClass;
|
|||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
import static org.apache.hadoop.hbase.client.MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
|
@ -75,7 +76,7 @@ public class TestClientPushback {
|
|||
conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, flushSizeBytes);
|
||||
// ensure we block the flushes when we are double that flushsize
|
||||
conf.setLong(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER, HConstants.DEFAULT_HREGION_MEMSTORE_BLOCK_MULTIPLIER);
|
||||
|
||||
conf.setBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, true);
|
||||
UTIL.startMiniCluster();
|
||||
UTIL.createTable(tableName, family);
|
||||
}
|
||||
|
@ -116,9 +117,8 @@ public class TestClientPushback {
|
|||
ServerStatistics serverStats = stats.getServerStatsForTesting(server);
|
||||
ServerStatistics.RegionStatistics regionStats = serverStats.getStatsForRegion(regionName);
|
||||
int load = regionStats.getMemstoreLoadPercent();
|
||||
if (load < 11) {
|
||||
assertEquals("Load on memstore too low", 11, load);
|
||||
}
|
||||
assertEquals("We did not find some load on the memstore", load,
|
||||
regionStats.getMemstoreLoadPercent());
|
||||
|
||||
// check that the load reported produces a nonzero delay
|
||||
long backoffTime = backoffPolicy.getBackoffTime(server, regionName, serverStats);
|
||||
|
@ -144,6 +144,21 @@ public class TestClientPushback {
|
|||
// produces a backoffTime of 151 milliseconds. This is long enough so the
|
||||
// wait and related checks below are reasonable. Revisit if the backoff
|
||||
// time reported by above debug logging has significantly deviated.
|
||||
String name = server.getServerName() + "," + Bytes.toStringBinary(regionName);
|
||||
MetricsConnection.RegionStats rsStats = connection.getConnectionMetrics().
|
||||
serverStats.get(server).get(regionName);
|
||||
assertEquals(name, rsStats.name);
|
||||
assertEquals(rsStats.heapOccupancyHist.mean(),
|
||||
(double)regionStats.getHeapOccupancyPercent(), 0.1 );
|
||||
assertEquals(rsStats.memstoreLoadHist.mean(),
|
||||
(double)regionStats.getMemstoreLoadPercent(), 0.1);
|
||||
|
||||
MetricsConnection.RunnerStats runnerStats = connection.getConnectionMetrics().runnerStats;
|
||||
|
||||
assertEquals(runnerStats.delayRunners.count(), 1);
|
||||
assertEquals(runnerStats.normalRunners.count(), 1);
|
||||
assertEquals("", runnerStats.delayIntevalHist.mean(), (double)backoffTime, 0.1);
|
||||
|
||||
latch.await(backoffTime * 2, TimeUnit.MILLISECONDS);
|
||||
assertNotEquals("AsyncProcess did not submit the work time", endTime.get(), 0);
|
||||
assertTrue("AsyncProcess did not delay long enough", endTime.get() - startTime >= backoffTime);
|
||||
|
|
Loading…
Reference in New Issue