HADOOP-16290. Enable RpcMetrics units to be configurable (#3198)

Signed-off-by: Akira Ajisaka <aajisaka@apache.org>
(cherry picked from commit e1d00addb5)
This commit is contained in:
Viraj Jasani 2021-07-20 12:25:49 +05:30 committed by Wei-Chiu Chuang
parent 177569f1af
commit ec3311975c
No known key found for this signature in database
GPG Key ID: B362E1C021854B9D
8 changed files with 133 additions and 21 deletions

View File

@ -379,6 +379,8 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic {
public static final String RPC_METRICS_PERCENTILES_INTERVALS_KEY =
"rpc.metrics.percentiles.intervals";
public static final String RPC_METRICS_TIME_UNIT = "rpc.metrics.timeunit";
/** Allowed hosts for nfs exports */
public static final String NFS_EXPORTS_ALLOWED_HOSTS_SEPARATOR = ";";
public static final String NFS_EXPORTS_ALLOWED_HOSTS_KEY = "nfs.exports.allowed.hosts";

View File

@ -178,6 +178,7 @@ public class DecayRpcScheduler implements RpcScheduler,
private final String namespace;
private final int topUsersCount; // e.g., report top 10 users' metrics
private static final double PRECISION = 0.0001;
private final TimeUnit metricsTimeUnit;
private MetricsProxy metricsProxy;
private final CostProvider costProvider;
@ -249,6 +250,8 @@ public class DecayRpcScheduler implements RpcScheduler,
DecayRpcSchedulerDetailedMetrics.create(ns);
decayRpcSchedulerDetailedMetrics.init(numLevels);
metricsTimeUnit = RpcMetrics.getMetricsTimeUnit(conf);
// Setup delay timer
Timer timer = new Timer(true);
DecayTask task = new DecayTask(this, timer);
@ -676,8 +679,9 @@ public class DecayRpcScheduler implements RpcScheduler,
addCost(user, processingCost);
int priorityLevel = schedulable.getPriorityLevel();
long queueTime = details.get(Timing.QUEUE, RpcMetrics.TIMEUNIT);
long processingTime = details.get(Timing.PROCESSING, RpcMetrics.TIMEUNIT);
long queueTime = details.get(Timing.QUEUE, metricsTimeUnit);
long processingTime = details.get(Timing.PROCESSING,
metricsTimeUnit);
this.decayRpcSchedulerDetailedMetrics.addQueueTime(
priorityLevel, queueTime);

View File

@ -62,10 +62,10 @@ public interface RpcScheduler {
// this interface, a default implementation is supplied which uses the old
// method. All new implementations MUST override this interface and should
// NOT use the other addResponseTime method.
int queueTime = (int)
details.get(ProcessingDetails.Timing.QUEUE, RpcMetrics.TIMEUNIT);
int processingTime = (int)
details.get(ProcessingDetails.Timing.PROCESSING, RpcMetrics.TIMEUNIT);
int queueTime = (int) details.get(ProcessingDetails.Timing.QUEUE,
RpcMetrics.DEFAULT_METRIC_TIME_UNIT);
int processingTime = (int) details.get(ProcessingDetails.Timing.PROCESSING,
RpcMetrics.DEFAULT_METRIC_TIME_UNIT);
addResponseTime(callName, schedulable.getPriorityLevel(),
queueTime, processingTime);
}

View File

@ -542,13 +542,13 @@ public abstract class Server {
(rpcMetrics.getProcessingStdDev() * deviation);
long processingTime =
details.get(Timing.PROCESSING, RpcMetrics.TIMEUNIT);
details.get(Timing.PROCESSING, rpcMetrics.getMetricsTimeUnit());
if ((rpcMetrics.getProcessingSampleCount() > minSampleSize) &&
(processingTime > threeSigma)) {
LOG.warn(
"Slow RPC : {} took {} {} to process from client {},"
+ " the processing detail is {}",
methodName, processingTime, RpcMetrics.TIMEUNIT, call,
methodName, processingTime, rpcMetrics.getMetricsTimeUnit(), call,
details.toString());
rpcMetrics.incrSlowRpc();
}
@ -568,7 +568,7 @@ public abstract class Server {
deltaNanos -= details.get(Timing.RESPONSE);
details.set(Timing.HANDLER, deltaNanos);
long queueTime = details.get(Timing.QUEUE, RpcMetrics.TIMEUNIT);
long queueTime = details.get(Timing.QUEUE, rpcMetrics.getMetricsTimeUnit());
rpcMetrics.addRpcQueueTime(queueTime);
if (call.isResponseDeferred() || connDropped) {
@ -577,9 +577,9 @@ public abstract class Server {
}
long processingTime =
details.get(Timing.PROCESSING, RpcMetrics.TIMEUNIT);
details.get(Timing.PROCESSING, rpcMetrics.getMetricsTimeUnit());
long waitTime =
details.get(Timing.LOCKWAIT, RpcMetrics.TIMEUNIT);
details.get(Timing.LOCKWAIT, rpcMetrics.getMetricsTimeUnit());
rpcMetrics.addRpcLockWaitTime(waitTime);
rpcMetrics.addRpcProcessingTime(processingTime);
// don't include lock wait for detailed metrics.

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.ipc.metrics;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.ipc.Server;
@ -48,8 +49,11 @@ public class RpcMetrics {
final MetricsRegistry registry;
final String name;
final boolean rpcQuantileEnable;
public static final TimeUnit DEFAULT_METRIC_TIME_UNIT =
TimeUnit.MILLISECONDS;
/** The time unit used when storing/accessing time durations. */
public final static TimeUnit TIMEUNIT = TimeUnit.MILLISECONDS;
private final TimeUnit metricsTimeUnit;
RpcMetrics(Server server, Configuration conf) {
String port = String.valueOf(server.getListenerAddress().getPort());
@ -63,6 +67,7 @@ public class RpcMetrics {
rpcQuantileEnable = (intervals.length > 0) && conf.getBoolean(
CommonConfigurationKeys.RPC_METRICS_QUANTILE_ENABLE,
CommonConfigurationKeys.RPC_METRICS_QUANTILE_ENABLE_DEFAULT);
metricsTimeUnit = getMetricsTimeUnit(conf);
if (rpcQuantileEnable) {
rpcQueueTimeQuantiles =
new MutableQuantiles[intervals.length];
@ -75,19 +80,19 @@ public class RpcMetrics {
for (int i = 0; i < intervals.length; i++) {
int interval = intervals[i];
rpcQueueTimeQuantiles[i] = registry.newQuantiles("rpcQueueTime"
+ interval + "s", "rpc queue time in " + TIMEUNIT, "ops",
+ interval + "s", "rpc queue time in " + metricsTimeUnit, "ops",
"latency", interval);
rpcLockWaitTimeQuantiles[i] = registry.newQuantiles(
"rpcLockWaitTime" + interval + "s",
"rpc lock wait time in " + TIMEUNIT, "ops",
"rpc lock wait time in " + metricsTimeUnit, "ops",
"latency", interval);
rpcProcessingTimeQuantiles[i] = registry.newQuantiles(
"rpcProcessingTime" + interval + "s",
"rpc processing time in " + TIMEUNIT, "ops",
"rpc processing time in " + metricsTimeUnit, "ops",
"latency", interval);
deferredRpcProcessingTimeQuantiles[i] = registry.newQuantiles(
"deferredRpcProcessingTime" + interval + "s",
"deferred rpc processing time in " + TIMEUNIT, "ops",
"deferred rpc processing time in " + metricsTimeUnit, "ops",
"latency", interval);
}
}
@ -141,6 +146,27 @@ public class RpcMetrics {
return server.getNumDroppedConnections();
}
public TimeUnit getMetricsTimeUnit() {
return metricsTimeUnit;
}
public static TimeUnit getMetricsTimeUnit(Configuration conf) {
TimeUnit metricsTimeUnit = RpcMetrics.DEFAULT_METRIC_TIME_UNIT;
String timeunit = conf.get(CommonConfigurationKeys.RPC_METRICS_TIME_UNIT);
if (StringUtils.isNotEmpty(timeunit)) {
try {
metricsTimeUnit = TimeUnit.valueOf(timeunit);
} catch (IllegalArgumentException e) {
LOG.info("Config key {} 's value {} does not correspond to enum values"
+ " of java.util.concurrent.TimeUnit. Hence default unit"
+ " {} will be used",
CommonConfigurationKeys.RPC_METRICS_TIME_UNIT, timeunit,
RpcMetrics.DEFAULT_METRIC_TIME_UNIT);
}
}
return metricsTimeUnit;
}
// Public instrumentation methods that could be extracted to an
// abstract class if we decide to do custom instrumentation classes a la
// JobTrackerInstrumentation. The methods with //@Override comment are

View File

@ -3293,6 +3293,21 @@
</description>
</property>
<property>
<name>rpc.metrics.timeunit</name>
<value>MILLISECONDS</value>
<description>
This property is used to configure timeunit for various RPC Metrics
e.g rpcQueueTime, rpcLockWaitTime, rpcProcessingTime,
deferredRpcProcessingTime. In the absence of this property,
default timeunit used is milliseconds.
The value of this property should match to any one value of enum:
java.util.concurrent.TimeUnit.
Some of the valid values: NANOSECONDS, MICROSECONDS, MILLISECONDS,
SECONDS etc.
</description>
</property>
<property>
<name>rpc.metrics.percentiles.intervals</name>
<value></value>

View File

@ -65,6 +65,8 @@ rpc
---
Each metrics record contains tags such as Hostname and port (number to which server is bound) as additional information along with metrics.
`rpc.metrics.timeunit` config can be used to configure timeunit for RPC metrics.
The default timeunit used for RPC metrics is milliseconds (as per the below description).
| Name | Description |
|:---- |:---- |

View File

@ -29,7 +29,6 @@ import org.apache.hadoop.io.retry.RetryProxy;
import org.apache.hadoop.ipc.Client.ConnectionId;
import org.apache.hadoop.ipc.Server.Call;
import org.apache.hadoop.ipc.Server.Connection;
import org.apache.hadoop.ipc.metrics.RpcMetrics;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto;
import org.apache.hadoop.ipc.protobuf.TestProtos;
@ -1098,8 +1097,8 @@ public class TestRPC extends TestRpcBase {
proxy.lockAndSleep(null, newSleepRequest(5));
rpcMetrics = getMetrics(server.getRpcMetrics().name());
assertGauge("RpcLockWaitTimeAvgTime",
(double)(RpcMetrics.TIMEUNIT.convert(10L, TimeUnit.SECONDS)),
rpcMetrics);
(double)(server.getRpcMetrics().getMetricsTimeUnit().convert(10L,
TimeUnit.SECONDS)), rpcMetrics);
} finally {
if (proxy2 != null) {
RPC.stopProxy(proxy2);
@ -1603,6 +1602,70 @@ public class TestRPC extends TestRpcBase {
assertTrue(rpcEngine instanceof StoppedRpcEngine);
}
@Test
public void testRpcMetricsInNanos() throws Exception {
final Server server;
TestRpcService proxy = null;
final int interval = 1;
conf.setBoolean(CommonConfigurationKeys.
RPC_METRICS_QUANTILE_ENABLE, true);
conf.set(CommonConfigurationKeys.
RPC_METRICS_PERCENTILES_INTERVALS_KEY, "" + interval);
conf.set(CommonConfigurationKeys.RPC_METRICS_TIME_UNIT, "NANOSECONDS");
server = setupTestServer(conf, 5);
String testUser = "testUserInNanos";
UserGroupInformation anotherUser =
UserGroupInformation.createRemoteUser(testUser);
TestRpcService proxy2 =
anotherUser.doAs((PrivilegedAction<TestRpcService>) () -> {
try {
return RPC.getProxy(TestRpcService.class, 0,
server.getListenerAddress(), conf);
} catch (IOException e) {
LOG.error("Something went wrong.", e);
}
return null;
});
try {
proxy = getClient(addr, conf);
for (int i = 0; i < 100; i++) {
proxy.ping(null, newEmptyRequest());
proxy.echo(null, newEchoRequest("" + i));
proxy2.echo(null, newEchoRequest("" + i));
}
MetricsRecordBuilder rpcMetrics =
getMetrics(server.getRpcMetrics().name());
assertEquals("Expected zero rpc lock wait time",
0, getDoubleGauge("RpcLockWaitTimeAvgTime", rpcMetrics), 0.001);
MetricsAsserts.assertQuantileGauges("RpcQueueTime" + interval + "s",
rpcMetrics);
MetricsAsserts.assertQuantileGauges("RpcProcessingTime" + interval + "s",
rpcMetrics);
proxy.lockAndSleep(null, newSleepRequest(5));
rpcMetrics = getMetrics(server.getRpcMetrics().name());
assertGauge("RpcLockWaitTimeAvgTime",
(double)(server.getRpcMetrics().getMetricsTimeUnit().convert(10L,
TimeUnit.SECONDS)), rpcMetrics);
LOG.info("RpcProcessingTimeAvgTime: {} , RpcQueueTimeAvgTime: {}",
getDoubleGauge("RpcProcessingTimeAvgTime", rpcMetrics),
getDoubleGauge("RpcQueueTimeAvgTime", rpcMetrics));
assertTrue(getDoubleGauge("RpcProcessingTimeAvgTime", rpcMetrics)
> 4000000D);
assertTrue(getDoubleGauge("RpcQueueTimeAvgTime", rpcMetrics)
> 4000D);
} finally {
if (proxy2 != null) {
RPC.stopProxy(proxy2);
}
stop(server, proxy);
}
}
public static void main(String[] args) throws Exception {
new TestRPC().testCallsInternal(conf);
}