From 3b38d011a8c7fa7c42acbc8e7a6c214cc76c0292 Mon Sep 17 00:00:00 2001 From: jianghuazhu <740087514@qq.com> Date: Wed, 25 Aug 2021 17:34:45 +0800 Subject: [PATCH] HDFS-16175.Improve the configurable value of Server #PURGE_INTERVAL_NANOS. (#3307) Co-authored-by: zhujianghua Reviewed-by: Ayush Saxena (cherry picked from commit ad54f5195c8c01f333703c55cd70703109d75f29) (cherry picked from commit 2b2f8f575bffb18df53a3cf70cc45ba6384798e2) --- .../fs/CommonConfigurationKeysPublic.java | 4 +++ .../java/org/apache/hadoop/ipc/Server.java | 29 +++++++++++++++---- .../src/main/resources/core-default.xml | 8 +++++ .../org/apache/hadoop/ipc/TestServer.java | 20 +++++++++++++ 4 files changed, 55 insertions(+), 6 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java index 64364ac5b8b..8761a25814e 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java @@ -459,6 +459,10 @@ public class CommonConfigurationKeysPublic { "ipc.server.log.slow.rpc"; public static final boolean IPC_SERVER_LOG_SLOW_RPC_DEFAULT = false; + public static final String IPC_SERVER_PURGE_INTERVAL_MINUTES_KEY = + "ipc.server.purge.interval"; + public static final int IPC_SERVER_PURGE_INTERVAL_MINUTES_DEFAULT = 15; + /** * @see * diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index f152368be1d..5f4b957b15a 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -480,6 +480,8 @@ public abstract class Server { volatile private boolean running = true; // true while server runs private CallQueueManager callQueue; + private long purgeIntervalNanos; + // maintains the set of client connections and handles idle timeouts private ConnectionManager connectionManager; private Listener listener = null; @@ -509,6 +511,20 @@ public abstract class Server { this.logSlowRPC = logSlowRPCFlag; } + private void setPurgeIntervalNanos(int purgeInterval) { + int tmpPurgeInterval = CommonConfigurationKeysPublic. + IPC_SERVER_PURGE_INTERVAL_MINUTES_DEFAULT; + if (purgeInterval > 0) { + tmpPurgeInterval = purgeInterval; + } + this.purgeIntervalNanos = TimeUnit.NANOSECONDS.convert( + tmpPurgeInterval, TimeUnit.MINUTES); + } + + @VisibleForTesting + public long getPurgeIntervalNanos() { + return this.purgeIntervalNanos; + } /** * Logs a Slow RPC Request. @@ -1477,9 +1493,6 @@ public abstract class Server { } } - private final static long PURGE_INTERVAL_NANOS = TimeUnit.NANOSECONDS.convert( - 15, TimeUnit.MINUTES); - // Sends responses of RPC back to clients. private class Responder extends Thread { private final Selector writeSelector; @@ -1515,7 +1528,7 @@ public abstract class Server { try { waitPending(); // If a channel is being registered, wait. writeSelector.select( - TimeUnit.NANOSECONDS.toMillis(PURGE_INTERVAL_NANOS)); + TimeUnit.NANOSECONDS.toMillis(purgeIntervalNanos)); Iterator iter = writeSelector.selectedKeys().iterator(); while (iter.hasNext()) { SelectionKey key = iter.next(); @@ -1538,7 +1551,7 @@ public abstract class Server { } } long nowNanos = Time.monotonicNowNanos(); - if (nowNanos < lastPurgeTimeNanos + PURGE_INTERVAL_NANOS) { + if (nowNanos < lastPurgeTimeNanos + purgeIntervalNanos) { continue; } lastPurgeTimeNanos = nowNanos; @@ -1616,7 +1629,7 @@ public abstract class Server { Iterator iter = responseQueue.listIterator(0); while (iter.hasNext()) { call = iter.next(); - if (now > call.responseTimestampNanos + PURGE_INTERVAL_NANOS) { + if (now > call.responseTimestampNanos + purgeIntervalNanos) { closeConnection(call.connection); break; } @@ -3119,6 +3132,10 @@ public abstract class Server { CommonConfigurationKeysPublic.IPC_SERVER_LOG_SLOW_RPC, CommonConfigurationKeysPublic.IPC_SERVER_LOG_SLOW_RPC_DEFAULT)); + this.setPurgeIntervalNanos(conf.getInt( + CommonConfigurationKeysPublic.IPC_SERVER_PURGE_INTERVAL_MINUTES_KEY, + CommonConfigurationKeysPublic.IPC_SERVER_PURGE_INTERVAL_MINUTES_DEFAULT)); + // Create the responder here responder = new Responder(); diff --git a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml index 3ef98086e1a..5296e882df5 100644 --- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml +++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml @@ -2107,6 +2107,14 @@ + + ipc.server.purge.interval + 15 + Define how often calls are cleaned up in the server. + The default is 15 minutes. The unit is minutes. + + + ipc.maximum.data.length 67108864 diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestServer.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestServer.java index 420d6b94084..0a24c456cc8 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestServer.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestServer.java @@ -26,8 +26,10 @@ import java.io.IOException; import java.net.BindException; import java.net.InetSocketAddress; import java.net.ServerSocket; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.ipc.Server.Call; @@ -185,4 +187,22 @@ public class TestServer { assertTrue(handler.isSuppressedLog(IpcException.class)); assertFalse(handler.isSuppressedLog(RpcClientException.class)); } + + @Test (timeout=300000) + public void testPurgeIntervalNanosConf() throws Exception { + Configuration conf = new Configuration(); + conf.setInt(CommonConfigurationKeysPublic. + IPC_SERVER_PURGE_INTERVAL_MINUTES_KEY, 3); + Server server = new Server("0.0.0.0", 0, LongWritable.class, + 1, conf) { + @Override + public Writable call( + RPC.RpcKind rpcKind, String protocol, Writable param, + long receiveTime) throws Exception { + return null; + } + }; + long purgeInterval = TimeUnit.NANOSECONDS.convert(3, TimeUnit.MINUTES); + assertEquals(server.getPurgeIntervalNanos(), purgeInterval); + } }