diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 9573af9876b..acd81428bde 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -118,6 +118,9 @@ Release 2.4.0 - UNRELEASED HADOOP-10208. Remove duplicate initialization in StringUtils.getStringCollection. (Benoy Antony via jing9) + HADOOP-9420. Add percentile or max metric for rpcQueueTime, processing time. + (Liang Xie via wang) + OPTIMIZATIONS HADOOP-9748. Reduce blocking on UGI.ensureInitialized (daryn) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java index 3c3da625739..b12a0fb8b29 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java @@ -242,4 +242,9 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic { public static final String HADOOP_USER_GROUP_METRICS_PERCENTILES_INTERVALS = "hadoop.user.group.metrics.percentiles.intervals"; + + public static final String RPC_METRICS_QUANTILE_ENABLE = + "rpc.metrics.quantile.enable"; + public static final String RPC_METRICS_PERCENTILES_INTERVALS_KEY = + "rpc.metrics.percentiles.intervals"; } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index 50b8b49b7d6..413278a281c 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -2107,7 +2107,7 @@ public abstract class Server { listener = new Listener(); this.port = listener.getAddress().getPort(); connectionManager = new ConnectionManager(); - this.rpcMetrics = RpcMetrics.create(this); + this.rpcMetrics = RpcMetrics.create(this, conf); this.rpcDetailedMetrics = RpcDetailedMetrics.create(this.port); this.tcpNoDelay = conf.getBoolean( CommonConfigurationKeysPublic.IPC_SERVER_TCPNODELAY_KEY, diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java index 56bfbf7684b..54b02277899 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java @@ -19,14 +19,17 @@ package org.apache.hadoop.ipc.metrics; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.ipc.Server; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.metrics2.annotation.Metric; import org.apache.hadoop.metrics2.annotation.Metrics; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.metrics2.lib.MetricsRegistry; import org.apache.hadoop.metrics2.lib.MutableCounterInt; import org.apache.hadoop.metrics2.lib.MutableCounterLong; +import org.apache.hadoop.metrics2.lib.MutableQuantiles; import org.apache.hadoop.metrics2.lib.MutableRate; /** @@ -41,26 +44,48 @@ public class RpcMetrics { final Server server; final MetricsRegistry registry; final String name; + final boolean rpcQuantileEnable; - RpcMetrics(Server server) { + RpcMetrics(Server server, Configuration conf) { String port = String.valueOf(server.getListenerAddress().getPort()); - name = "RpcActivityForPort"+ port; + name = "RpcActivityForPort" + port; this.server = server; registry = new MetricsRegistry("rpc").tag("port", "RPC port", port); - LOG.debug("Initialized "+ registry); + int[] intervals = conf.getInts( + CommonConfigurationKeys.RPC_METRICS_PERCENTILES_INTERVALS_KEY); + rpcQuantileEnable = (intervals.length > 0) && conf.getBoolean( + CommonConfigurationKeys.RPC_METRICS_QUANTILE_ENABLE, false); + if (rpcQuantileEnable) { + rpcQueueTimeMillisQuantiles = + new MutableQuantiles[intervals.length]; + rpcProcessingTimeMillisQuantiles = + new MutableQuantiles[intervals.length]; + for (int i = 0; i < intervals.length; i++) { + int interval = intervals[i]; + rpcQueueTimeMillisQuantiles[i] = registry.newQuantiles("rpcQueueTime" + + interval + "s", "rpc queue time in milli second", "ops", + "latency", interval); + rpcProcessingTimeMillisQuantiles[i] = registry.newQuantiles( + "rpcProcessingTime" + interval + "s", + "rpc processing time in milli second", "ops", "latency", interval); + } + } + LOG.debug("Initialized " + registry); } public String name() { return name; } - public static RpcMetrics create(Server server) { - RpcMetrics m = new RpcMetrics(server); + public static RpcMetrics create(Server server, Configuration conf) { + RpcMetrics m = new RpcMetrics(server, conf); return DefaultMetricsSystem.instance().register(m.name, null, m); } @Metric("Number of received bytes") MutableCounterLong receivedBytes; @Metric("Number of sent bytes") MutableCounterLong sentBytes; @Metric("Queue time") MutableRate rpcQueueTime; + MutableQuantiles[] rpcQueueTimeMillisQuantiles; @Metric("Processsing time") MutableRate rpcProcessingTime; + MutableQuantiles[] rpcProcessingTimeMillisQuantiles; @Metric("Number of authentication failures") MutableCounterInt rpcAuthenticationFailures; @Metric("Number of authentication successes") @@ -146,6 +171,11 @@ public class RpcMetrics { //@Override public void addRpcQueueTime(int qTime) { rpcQueueTime.add(qTime); + if (rpcQuantileEnable) { + for (MutableQuantiles q : rpcQueueTimeMillisQuantiles) { + q.add(qTime); + } + } } /** @@ -155,5 +185,10 @@ public class RpcMetrics { //@Override public void addRpcProcessingTime(int processingTime) { rpcProcessingTime.add(processingTime); + if (rpcQuantileEnable) { + for (MutableQuantiles q : rpcProcessingTimeMillisQuantiles) { + q.add(processingTime); + } + } } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java index 572557cc41a..c5300ba87b5 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java @@ -20,6 +20,7 @@ package org.apache.hadoop.ipc; import static org.apache.hadoop.test.MetricsAsserts.assertCounter; import static org.apache.hadoop.test.MetricsAsserts.assertCounterGt; +import static org.apache.hadoop.test.MetricsAsserts.getLongCounter; import static org.apache.hadoop.test.MetricsAsserts.getMetrics; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotSame; @@ -67,6 +68,7 @@ import org.apache.hadoop.security.authorize.PolicyProvider; import org.apache.hadoop.security.authorize.Service; import org.apache.hadoop.security.token.SecretManager; import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.hadoop.test.MetricsAsserts; import org.apache.hadoop.test.MockitoUtil; import org.junit.Before; import org.junit.Test; @@ -957,6 +959,44 @@ public class TestRPC { } } + @Test + public void testRpcMetrics() throws Exception { + Configuration configuration = new Configuration(); + final int interval = 1; + configuration.setBoolean(CommonConfigurationKeys. + RPC_METRICS_QUANTILE_ENABLE, true); + configuration.set(CommonConfigurationKeys. + RPC_METRICS_PERCENTILES_INTERVALS_KEY, "" + interval); + final Server server = new RPC.Builder(configuration) + .setProtocol(TestProtocol.class).setInstance(new TestImpl()) + .setBindAddress(ADDRESS).setPort(0).setNumHandlers(5).setVerbose(true) + .build(); + server.start(); + final TestProtocol proxy = RPC.getProxy(TestProtocol.class, + TestProtocol.versionID, server.getListenerAddress(), configuration); + try { + for (int i=0; i<1000; i++) { + proxy.ping(); + proxy.echo("" + i); + } + MetricsRecordBuilder rpcMetrics = + getMetrics(server.getRpcMetrics().name()); + assertTrue("Expected non-zero rpc queue time", + getLongCounter("RpcQueueTimeNumOps", rpcMetrics) > 0); + assertTrue("Expected non-zero rpc processing time", + getLongCounter("RpcProcessingTimeNumOps", rpcMetrics) > 0); + MetricsAsserts.assertQuantileGauges("RpcQueueTime" + interval + "s", + rpcMetrics); + MetricsAsserts.assertQuantileGauges("RpcProcessingTime" + interval + "s", + rpcMetrics); + } finally { + if (proxy != null) { + RPC.stopProxy(proxy); + } + server.stop(); + } + } + public static void main(String[] args) throws IOException { new TestRPC().testCallsInternal(conf);