HADOOP-18288. Total requests and total requests per sec served by RPC servers (#4485)
Signed-off-by: Tao Li <tomscut@apache.org>
This commit is contained in:
parent
d41e0a9cc3
commit
4ba463069b
|
@ -1028,5 +1028,13 @@ public class CommonConfigurationKeysPublic {
|
|||
public static final String HADOOP_HTTP_IDLE_TIMEOUT_MS_KEY =
|
||||
"hadoop.http.idle_timeout.ms";
|
||||
public static final int HADOOP_HTTP_IDLE_TIMEOUT_MS_DEFAULT = 60000;
|
||||
|
||||
/**
|
||||
* To configure scheduling of server metrics update thread. This config is used to indicate
|
||||
* initial delay and delay between each execution of the metric update runnable thread.
|
||||
*/
|
||||
public static final String IPC_SERVER_METRICS_UPDATE_RUNNER_INTERVAL =
|
||||
"ipc.server.metrics.update.runner.interval";
|
||||
public static final int IPC_SERVER_METRICS_UPDATE_RUNNER_INTERVAL_DEFAULT = 5000;
|
||||
}
|
||||
|
||||
|
|
|
@ -65,9 +65,12 @@ import java.util.TimerTask;
|
|||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.atomic.LongAdder;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import javax.security.sasl.Sasl;
|
||||
|
@ -127,6 +130,7 @@ import org.apache.hadoop.tracing.Tracer;
|
|||
import org.apache.hadoop.tracing.TraceUtils;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
import org.apache.hadoop.thirdparty.protobuf.ByteString;
|
||||
import org.apache.hadoop.thirdparty.protobuf.CodedOutputStream;
|
||||
import org.apache.hadoop.thirdparty.protobuf.Message;
|
||||
|
@ -499,6 +503,11 @@ public abstract class Server {
|
|||
private Map<Integer, Listener> auxiliaryListenerMap;
|
||||
private Responder responder = null;
|
||||
private Handler[] handlers = null;
|
||||
private final LongAdder totalRequests = new LongAdder();
|
||||
private long lastSeenTotalRequests = 0;
|
||||
private long totalRequestsPerSecond = 0;
|
||||
private final long metricsUpdaterInterval;
|
||||
private final ScheduledExecutorService scheduledExecutorService;
|
||||
|
||||
private boolean logSlowRPC = false;
|
||||
|
||||
|
@ -510,6 +519,14 @@ public abstract class Server {
|
|||
return logSlowRPC;
|
||||
}
|
||||
|
||||
public long getTotalRequests() {
|
||||
return totalRequests.sum();
|
||||
}
|
||||
|
||||
public long getTotalRequestsPerSecond() {
|
||||
return totalRequestsPerSecond;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets slow RPC flag.
|
||||
* @param logSlowRPCFlag input logSlowRPCFlag.
|
||||
|
@ -573,6 +590,7 @@ public abstract class Server {
|
|||
}
|
||||
|
||||
void updateMetrics(Call call, long startTime, boolean connDropped) {
|
||||
totalRequests.increment();
|
||||
// delta = handler + processing + response
|
||||
long deltaNanos = Time.monotonicNowNanos() - startTime;
|
||||
long timestampNanos = call.timestampNanos;
|
||||
|
@ -3193,6 +3211,14 @@ public abstract class Server {
|
|||
this.exceptionsHandler.addTerseLoggingExceptions(StandbyException.class);
|
||||
this.exceptionsHandler.addTerseLoggingExceptions(
|
||||
HealthCheckFailedException.class);
|
||||
this.metricsUpdaterInterval =
|
||||
conf.getLong(CommonConfigurationKeysPublic.IPC_SERVER_METRICS_UPDATE_RUNNER_INTERVAL,
|
||||
CommonConfigurationKeysPublic.IPC_SERVER_METRICS_UPDATE_RUNNER_INTERVAL_DEFAULT);
|
||||
this.scheduledExecutorService = new ScheduledThreadPoolExecutor(1,
|
||||
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Hadoop-Metrics-Updater-%d")
|
||||
.build());
|
||||
this.scheduledExecutorService.scheduleWithFixedDelay(new MetricsUpdateRunner(),
|
||||
metricsUpdaterInterval, metricsUpdaterInterval, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
public synchronized void addAuxiliaryListener(int auxiliaryPort)
|
||||
|
@ -3487,10 +3513,25 @@ public abstract class Server {
|
|||
}
|
||||
responder.interrupt();
|
||||
notifyAll();
|
||||
shutdownMetricsUpdaterExecutor();
|
||||
this.rpcMetrics.shutdown();
|
||||
this.rpcDetailedMetrics.shutdown();
|
||||
}
|
||||
|
||||
private void shutdownMetricsUpdaterExecutor() {
|
||||
this.scheduledExecutorService.shutdown();
|
||||
try {
|
||||
boolean isExecutorShutdown =
|
||||
this.scheduledExecutorService.awaitTermination(3, TimeUnit.SECONDS);
|
||||
if (!isExecutorShutdown) {
|
||||
LOG.info("Hadoop Metrics Updater executor could not be shutdown.");
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
LOG.info("Hadoop Metrics Updater executor shutdown interrupted.", e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait for the server to be stopped.
|
||||
* Does not wait for all subthreads to finish.
|
||||
|
@ -3950,4 +3991,32 @@ public abstract class Server {
|
|||
public String getServerName() {
|
||||
return serverName;
|
||||
}
|
||||
|
||||
/**
|
||||
* Server metrics updater thread, used to update some metrics on a regular basis.
|
||||
* For instance, requests per second.
|
||||
*/
|
||||
private class MetricsUpdateRunner implements Runnable {
|
||||
|
||||
private long lastExecuted = 0;
|
||||
|
||||
@Override
|
||||
public synchronized void run() {
|
||||
long currentTime = Time.monotonicNow();
|
||||
if (lastExecuted == 0) {
|
||||
lastExecuted = currentTime - metricsUpdaterInterval;
|
||||
}
|
||||
long currentTotalRequests = totalRequests.sum();
|
||||
long totalRequestsDiff = currentTotalRequests - lastSeenTotalRequests;
|
||||
lastSeenTotalRequests = currentTotalRequests;
|
||||
if ((currentTime - lastExecuted) > 0) {
|
||||
double totalRequestsPerSecInDouble =
|
||||
(double) totalRequestsDiff / TimeUnit.MILLISECONDS.toSeconds(
|
||||
currentTime - lastExecuted);
|
||||
totalRequestsPerSecond = ((long) totalRequestsPerSecInDouble);
|
||||
}
|
||||
lastExecuted = currentTime;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -146,6 +146,16 @@ public class RpcMetrics {
|
|||
return server.getNumDroppedConnections();
|
||||
}
|
||||
|
||||
@Metric("Number of total requests")
|
||||
public long getTotalRequests() {
|
||||
return server.getTotalRequests();
|
||||
}
|
||||
|
||||
@Metric("Number of total requests per second")
|
||||
public long getTotalRequestsPerSecond() {
|
||||
return server.getTotalRequestsPerSecond();
|
||||
}
|
||||
|
||||
public TimeUnit getMetricsTimeUnit() {
|
||||
return metricsTimeUnit;
|
||||
}
|
||||
|
|
|
@ -103,6 +103,8 @@ The default timeunit used for RPC metrics is milliseconds (as per the below desc
|
|||
| `rpcLockWaitTime`*num*`s90thPercentileLatency` | Shows the 90th percentile of RPC lock wait time in milliseconds (*num* seconds granularity) if `rpc.metrics.quantile.enable` is set to true. *num* is specified by `rpc.metrics.percentiles.intervals`. |
|
||||
| `rpcLockWaitTime`*num*`s95thPercentileLatency` | Shows the 95th percentile of RPC lock wait time in milliseconds (*num* seconds granularity) if `rpc.metrics.quantile.enable` is set to true. *num* is specified by `rpc.metrics.percentiles.intervals`. |
|
||||
| `rpcLockWaitTime`*num*`s99thPercentileLatency` | Shows the 99th percentile of RPC lock wait time in milliseconds (*num* seconds granularity) if `rpc.metrics.quantile.enable` is set to true. *num* is specified by `rpc.metrics.percentiles.intervals`. |
|
||||
| `TotalRequests` | Total num of requests served by the RPC server. |
|
||||
| `TotalRequestsPerSeconds` | Total num of requests per second served by the RPC server. |
|
||||
|
||||
RetryCache/NameNodeRetryCache
|
||||
-----------------------------
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
|
||||
package org.apache.hadoop.ipc;
|
||||
|
||||
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
import org.apache.hadoop.thirdparty.protobuf.ServiceException;
|
||||
import org.apache.hadoop.HadoopIllegalArgumentException;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -29,6 +30,7 @@ import org.apache.hadoop.io.retry.RetryProxy;
|
|||
import org.apache.hadoop.ipc.Client.ConnectionId;
|
||||
import org.apache.hadoop.ipc.Server.Call;
|
||||
import org.apache.hadoop.ipc.Server.Connection;
|
||||
import org.apache.hadoop.ipc.metrics.RpcMetrics;
|
||||
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto;
|
||||
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto;
|
||||
import org.apache.hadoop.ipc.protobuf.TestProtos;
|
||||
|
@ -83,6 +85,7 @@ import java.util.concurrent.Future;
|
|||
import java.util.concurrent.ThreadLocalRandom;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
@ -1665,6 +1668,60 @@ public class TestRPC extends TestRpcBase {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNumTotalRequestsMetrics() throws Exception {
|
||||
UserGroupInformation ugi = UserGroupInformation.
|
||||
createUserForTesting("userXyz", new String[0]);
|
||||
|
||||
final Server server = setupTestServer(conf, 1);
|
||||
|
||||
ExecutorService executorService = null;
|
||||
try {
|
||||
RpcMetrics rpcMetrics = server.getRpcMetrics();
|
||||
assertEquals(0, rpcMetrics.getTotalRequests());
|
||||
assertEquals(0, rpcMetrics.getTotalRequestsPerSecond());
|
||||
|
||||
List<ExternalCall<Void>> externalCallList = new ArrayList<>();
|
||||
|
||||
executorService = Executors.newSingleThreadExecutor(
|
||||
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("testNumTotalRequestsMetrics")
|
||||
.build());
|
||||
AtomicInteger rps = new AtomicInteger(0);
|
||||
CountDownLatch countDownLatch = new CountDownLatch(1);
|
||||
executorService.submit(() -> {
|
||||
while (true) {
|
||||
int numRps = (int) rpcMetrics.getTotalRequestsPerSecond();
|
||||
rps.getAndSet(numRps);
|
||||
if (rps.get() > 0) {
|
||||
countDownLatch.countDown();
|
||||
break;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
for (int i = 0; i < 100000; i++) {
|
||||
externalCallList.add(newExtCall(ugi, () -> null));
|
||||
}
|
||||
for (ExternalCall<Void> externalCall : externalCallList) {
|
||||
server.queueCall(externalCall);
|
||||
}
|
||||
for (ExternalCall<Void> externalCall : externalCallList) {
|
||||
externalCall.get();
|
||||
}
|
||||
|
||||
assertEquals(100000, rpcMetrics.getTotalRequests());
|
||||
if (countDownLatch.await(10, TimeUnit.SECONDS)) {
|
||||
assertTrue(rps.get() > 10);
|
||||
} else {
|
||||
throw new AssertionError("total requests per seconds are still 0");
|
||||
}
|
||||
} finally {
|
||||
if (executorService != null) {
|
||||
executorService.shutdown();
|
||||
}
|
||||
server.stop();
|
||||
}
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
new TestRPC().testCallsInternal(conf);
|
||||
|
|
|
@ -34,6 +34,8 @@ import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
|||
import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile;
|
||||
import static org.apache.hadoop.hdfs.server.namenode.FileJournalManager
|
||||
.getLogFile;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.junit.After;
|
||||
|
@ -106,6 +108,8 @@ public class TestJournalNodeSync {
|
|||
File firstJournalDir = jCluster.getJournalDir(0, jid);
|
||||
File firstJournalCurrentDir = new StorageDirectory(firstJournalDir)
|
||||
.getCurrentDir();
|
||||
assertThat(jCluster.getJournalNode(0).getRpcServer().getRpcServer().getRpcMetrics()
|
||||
.getTotalRequests()).isGreaterThan(20);
|
||||
|
||||
// Generate some edit logs and delete one.
|
||||
long firstTxId = generateEditLog();
|
||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.namenode.ha;
|
|||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_STATE_CONTEXT_ENABLED_KEY;
|
||||
import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
|
||||
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
@ -123,6 +124,7 @@ public class TestConsistentReadsObserver {
|
|||
+ CommonConfigurationKeys.IPC_BACKOFF_ENABLE, true);
|
||||
|
||||
NameNodeAdapter.getRpcServer(nn).refreshCallQueue(configuration);
|
||||
assertThat(NameNodeAdapter.getRpcServer(nn).getTotalRequests()).isGreaterThan(0);
|
||||
|
||||
dfs.create(testPath, (short)1).close();
|
||||
assertSentTo(0);
|
||||
|
@ -132,6 +134,7 @@ public class TestConsistentReadsObserver {
|
|||
// be triggered and client should retry active NN.
|
||||
dfs.getFileStatus(testPath);
|
||||
assertSentTo(0);
|
||||
assertThat(NameNodeAdapter.getRpcServer(nn).getTotalRequests()).isGreaterThan(1);
|
||||
// reset the original call queue
|
||||
NameNodeAdapter.getRpcServer(nn).refreshCallQueue(originalConf);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue