diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java index 7c7fc3e67db..f1fa3eb3876 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java @@ -1009,6 +1009,9 @@ class AsyncProcess { int numAttempt) { // no stats to manage, just do the standard action if (AsyncProcess.this.connection.getStatisticsTracker() == null) { + if (connection.getConnectionMetrics() != null) { + connection.getConnectionMetrics().incrNormalRunners(); + } return Collections.singletonList(Trace.wrap("AsyncProcess.sendMultiAction", new SingleServerRequestRunnable(multiAction, numAttempt, server, callsInProgress))); } @@ -1039,6 +1042,14 @@ class AsyncProcess { runner.setRunner(runnable); traceText = "AsyncProcess.clientBackoff.sendMultiAction"; runnable = runner; + if (connection.getConnectionMetrics() != null) { + connection.getConnectionMetrics().incrDelayRunners(); + connection.getConnectionMetrics().updateDelayInterval(runner.getSleepTime()); + } + } else { + if (connection.getConnectionMetrics() != null) { + connection.getConnectionMetrics().incrNormalRunners(); + } } runnable = Trace.wrap(traceText, runnable); toReturn.add(runnable); @@ -1267,6 +1278,12 @@ class AsyncProcess { ++failed; } } else { + + if (AsyncProcess.this.connection.getConnectionMetrics() != null) { + AsyncProcess.this.connection.getConnectionMetrics(). + updateServerStats(server, regionName, result); + } + // update the stats about the region, if its a user table. We don't want to slow down // updates to meta tables, especially from internal updates (master, etc). if (AsyncProcess.this.connection.getStatisticsTracker() != null) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java index f34fb8ae1e8..3863c37cd32 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java @@ -26,12 +26,16 @@ import com.yammer.metrics.core.MetricsRegistry; import com.yammer.metrics.core.Timer; import com.yammer.metrics.reporting.JmxReporter; import com.yammer.metrics.util.RatioGauge; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType; +import org.apache.hadoop.hbase.util.Bytes; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -54,6 +58,8 @@ public class MetricsConnection { private static final String DRTN_BASE = "rpcCallDurationMs_"; private static final String REQ_BASE = "rpcCallRequestSizeBytes_"; private static final String RESP_BASE = "rpcCallResponseSizeBytes_"; + private static final String MEMLOAD_BASE = "memstoreLoad_"; + private static final String HEAP_BASE = "heapOccupancy_"; private static final String CLIENT_SVC = ClientService.getDescriptor().getName(); /** A container class for collecting details about the RPC call as it percolates. */ @@ -130,6 +136,88 @@ public class MetricsConnection { } } + protected static class RegionStats { + final String name; + final Histogram memstoreLoadHist; + final Histogram heapOccupancyHist; + + public RegionStats(MetricsRegistry registry, String name) { + this.name = name; + this.memstoreLoadHist = registry.newHistogram(MetricsConnection.class, + MEMLOAD_BASE + this.name); + this.heapOccupancyHist = registry.newHistogram(MetricsConnection.class, + HEAP_BASE + this.name); + } + + public void update(ClientProtos.RegionLoadStats regionStatistics) { + this.memstoreLoadHist.update(regionStatistics.getMemstoreLoad()); + this.heapOccupancyHist.update(regionStatistics.getHeapOccupancy()); + } + } + + @VisibleForTesting + protected static class RunnerStats { + final Counter normalRunners; + final Counter delayRunners; + final Histogram delayIntevalHist; + + public RunnerStats(MetricsRegistry registry) { + this.normalRunners = registry.newCounter(MetricsConnection.class, "normalRunnersCount"); + this.delayRunners = registry.newCounter(MetricsConnection.class, "delayRunnersCount"); + this.delayIntevalHist = registry.newHistogram(MetricsConnection.class, "delayIntervalHist"); + } + + public void incrNormalRunners() { + this.normalRunners.inc(); + } + + public void incrDelayRunners() { + this.delayRunners.inc(); + } + + public void updateDelayInterval(long interval) { + this.delayIntevalHist.update(interval); + } + } + + @VisibleForTesting + protected ConcurrentHashMap> serverStats + = new ConcurrentHashMap>(); + + public void updateServerStats(ServerName serverName, byte[] regionName, + Object r) { + if (!(r instanceof Result)) { + return; + } + Result result = (Result) r; + ClientProtos.RegionLoadStats stats = result.getStats(); + if(stats == null){ + return; + } + String name = serverName.getServerName() + "," + Bytes.toStringBinary(regionName); + ConcurrentMap rsStats = null; + if (serverStats.containsKey(serverName)) { + rsStats = serverStats.get(serverName); + } else { + rsStats = serverStats.putIfAbsent(serverName, + new ConcurrentSkipListMap(Bytes.BYTES_COMPARATOR)); + if (rsStats == null) { + rsStats = serverStats.get(serverName); + } + } + RegionStats regionStats = null; + if (rsStats.containsKey(regionName)) { + regionStats = rsStats.get(regionName); + } else { + regionStats = rsStats.putIfAbsent(regionName, new RegionStats(this.registry, name)); + if (regionStats == null) { + regionStats = rsStats.get(regionName); + } + } + regionStats.update(stats); + } + + /** A lambda for dispatching to the appropriate metric factory method */ private static interface NewMetric { T newMetric(Class clazz, String name, String scope); @@ -172,6 +260,7 @@ public class MetricsConnection { @VisibleForTesting protected final CallTracker incrementTracker; @VisibleForTesting protected final CallTracker putTracker; @VisibleForTesting protected final CallTracker multiTracker; + @VisibleForTesting protected final RunnerStats runnerStats; // dynamic metrics @@ -217,6 +306,8 @@ public class MetricsConnection { this.incrementTracker = new CallTracker(this.registry, "Mutate", "Increment", scope); this.putTracker = new CallTracker(this.registry, "Mutate", "Put", scope); this.multiTracker = new CallTracker(this.registry, "Multi", scope); + this.runnerStats = new RunnerStats(this.registry); + this.reporter = new JmxReporter(this.registry); this.reporter.start(); } @@ -242,6 +333,21 @@ public class MetricsConnection { metaCacheMisses.inc(); } + /** Increment the number of normal runner counts. */ + public void incrNormalRunners() { + this.runnerStats.incrNormalRunners(); + } + + /** Increment the number of delay runner counts. */ + public void incrDelayRunners() { + this.runnerStats.incrDelayRunners(); + } + + /** Update delay interval of delay runner. */ + public void updateDelayInterval(long interval) { + this.runnerStats.updateDelayInterval(interval); + } + /** * Get a metric for {@code key} from {@code map}, or create it with {@code factory}. */ diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java index 995e3e505cc..1efbe05fac1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java @@ -42,6 +42,7 @@ import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; +import static org.apache.hadoop.hbase.client.MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; @@ -74,7 +75,7 @@ public class TestClientPushback { conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, flushSizeBytes); // ensure we block the flushes when we are double that flushsize conf.setLong(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER, HConstants.DEFAULT_HREGION_MEMSTORE_BLOCK_MULTIPLIER); - + conf.setBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, true); UTIL.startMiniCluster(1); UTIL.createTable(tableName, family); } @@ -87,6 +88,7 @@ public class TestClientPushback { @Test(timeout=60000) public void testClientTracksServerPushback() throws Exception{ Configuration conf = UTIL.getConfiguration(); + ClusterConnection conn = (ClusterConnection) ConnectionFactory.createConnection(conf); HTable table = (HTable) conn.getTable(tableName); @@ -119,7 +121,6 @@ public class TestClientPushback { ServerStatistics.RegionStatistics regionStats = serverStats.getStatsForRegion(regionName); assertEquals("We did not find some load on the memstore", load, regionStats.getMemstoreLoadPercent()); - // check that the load reported produces a nonzero delay long backoffTime = backoffPolicy.getBackoffTime(server, regionName, serverStats); assertNotEquals("Reported load does not produce a backoff", backoffTime, 0); @@ -145,6 +146,21 @@ public class TestClientPushback { // produces a backoffTime of 151 milliseconds. This is long enough so the // wait and related checks below are reasonable. Revisit if the backoff // time reported by above debug logging has significantly deviated. + String name = server.getServerName() + "," + Bytes.toStringBinary(regionName); + MetricsConnection.RegionStats rsStats = conn.getConnectionMetrics(). + serverStats.get(server).get(regionName); + assertEquals(name, rsStats.name); + assertEquals(rsStats.heapOccupancyHist.mean(), + (double)regionStats.getHeapOccupancyPercent(), 0.1 ); + assertEquals(rsStats.memstoreLoadHist.mean(), + (double)regionStats.getMemstoreLoadPercent(), 0.1); + + MetricsConnection.RunnerStats runnerStats = conn.getConnectionMetrics().runnerStats; + + assertEquals(runnerStats.delayRunners.count(), 1); + assertEquals(runnerStats.normalRunners.count(), 1); + assertEquals("", runnerStats.delayIntevalHist.mean(), (double)backoffTime, 0.1); + latch.await(backoffTime * 2, TimeUnit.MILLISECONDS); assertNotEquals("AsyncProcess did not submit the work time", endTime.get(), 0); assertTrue("AsyncProcess did not delay long enough", endTime.get() - startTime >= backoffTime);