From 1f612f728b564aeb310002eac04343093477e72e Mon Sep 17 00:00:00 2001 From: zhengchenyu Date: Fri, 7 Apr 2023 11:00:48 +0800 Subject: [PATCH] HADOOP-18692. User in staticPriorities cost also shouldn't be accumulated to totalDecayedCallCost and totalRawCallCost. --- .../apache/hadoop/ipc/DecayRpcScheduler.java | 18 +++--- .../hadoop/ipc/TestDecayRpcScheduler.java | 4 +- .../java/org/apache/hadoop/ipc/TestRPC.java | 60 +++++++++++++++++-- 3 files changed, 63 insertions(+), 19 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java index 63274bb01e7..8906a962759 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java @@ -23,7 +23,6 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -197,7 +196,6 @@ public class DecayRpcScheduler implements RpcScheduler, private MetricsProxy metricsProxy; private final CostProvider costProvider; private final Map staticPriorities = new HashMap<>(); - private Set serviceUserNames; /** * This TimerTask will call decayCurrentCosts until @@ -249,7 +247,7 @@ public class DecayRpcScheduler implements RpcScheduler, conf); this.backOffResponseTimeThresholds = parseBackOffResponseTimeThreshold(ns, conf, numLevels); - this.serviceUserNames = this.parseServiceUserNames(ns, conf); + this.parseServiceUserNames(ns, conf); // Setup response time metrics responseTimeTotalInCurrWindow = new AtomicLongArray(numLevels); @@ -406,10 +404,12 @@ public class DecayRpcScheduler implements RpcScheduler, return decimals; } - private Set parseServiceUserNames(String ns, Configuration conf) { + private void parseServiceUserNames(String ns, Configuration conf) { Collection collection = conf.getStringCollection( ns + "." + IPC_DECAYSCHEDULER_SERVICE_USERS_KEY); - return new HashSet<>(collection); + for (String user : collection) { + staticPriorities.put(user, -1); + } } /** @@ -607,10 +607,6 @@ public class DecayRpcScheduler implements RpcScheduler, * @return scheduling decision from 0 to numLevels - 1 */ private int computePriorityLevel(long cost, Object identity) { - // The priority for service users is always 0 - if (isServiceUser((String)identity)) { - return 0; - } Integer staticPriority = staticPriorities.get(identity); if (staticPriority != null) { return staticPriority.intValue(); @@ -714,7 +710,7 @@ public class DecayRpcScheduler implements RpcScheduler, } private boolean isServiceUser(String userName) { - return this.serviceUserNames.contains(userName); + return this.staticPriorities.getOrDefault(userName, 0) < 0; } @Override @@ -816,7 +812,7 @@ public class DecayRpcScheduler implements RpcScheduler, @VisibleForTesting Set getServiceUserNames() { - return serviceUserNames; + return staticPriorities.keySet(); } @VisibleForTesting diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestDecayRpcScheduler.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestDecayRpcScheduler.java index 4ae3de1b158..190ceabad97 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestDecayRpcScheduler.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestDecayRpcScheduler.java @@ -482,8 +482,8 @@ public class TestDecayRpcScheduler { String summary = scheduler.getSchedulingDecisionSummary(); Map summaryMap = (Map) JSON.parse(summary); assertNotEquals(0L, summaryMap.get("user1")); - assertEquals(0L, summaryMap.get("service1")); - assertEquals(0L, summaryMap.get("service2")); + assertEquals(-1L, summaryMap.get("service1")); + assertEquals(-1L, summaryMap.get("service2")); } /** diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java index 9126316fca6..5f56e428693 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java @@ -22,6 +22,7 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.io.retry.RetryUtils; import org.apache.hadoop.ipc.metrics.RpcMetrics; +import org.apache.hadoop.test.Whitebox; import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.thirdparty.protobuf.ServiceException; import org.apache.hadoop.HadoopIllegalArgumentException; @@ -1583,17 +1584,64 @@ public class TestRPC extends TestRpcBase { try { server = setupDecayRpcSchedulerandTestServer(ns + "."); - UserGroupInformation ugi = UserGroupInformation.createRemoteUser("user"); + UserGroupInformation ugi1 = UserGroupInformation.createRemoteUser("user"); // normal users start with priority 0. - Assert.assertEquals(0, server.getPriorityLevel(ugi)); + Assert.assertEquals(0, server.getPriorityLevel(ugi1)); // calls for a protocol defined client will have priority of 0. - Assert.assertEquals(0, server.getPriorityLevel(newSchedulable(ugi))); + Assert.assertEquals(0, server.getPriorityLevel(newSchedulable(ugi1))); // protocol defined client will have top priority of -1. - ugi = UserGroupInformation.createRemoteUser("clientForProtocol"); - Assert.assertEquals(-1, server.getPriorityLevel(ugi)); + UserGroupInformation ugi2 = UserGroupInformation.createRemoteUser("clientForProtocol"); + Assert.assertEquals(-1, server.getPriorityLevel(ugi2)); // calls for a protocol defined client will have priority of 0. - Assert.assertEquals(0, server.getPriorityLevel(newSchedulable(ugi))); + Assert.assertEquals(0, server.getPriorityLevel(newSchedulable(ugi2))); + + // user call + ugi1.doAs(new PrivilegedExceptionAction() { + @Override + public Object run() throws Exception { + TestRpcService proxy = getClient(addr, conf); + for (int i = 0; i < 10; i++) { + proxy.ping(null, newEmptyRequest()); + } + return null; + } + }); + // clientForProtocol call + ugi2.doAs(new PrivilegedExceptionAction() { + @Override + public Object run() throws Exception { + TestRpcService proxy = getClient(addr, conf); + for (int i = 0; i < 30; i++) { + proxy.ping(null, newEmptyRequest()); + } + return null; + } + }); + + CallQueueManager callQueueManager = + (CallQueueManager) Whitebox.getInternalState(server, "callQueue"); + DecayRpcScheduler scheduler = + (DecayRpcScheduler) Whitebox.getInternalState(callQueueManager, "scheduler"); + Assert.assertNotNull(scheduler); + + // test total costs. + assertEquals(10, scheduler.getTotalCallVolume()); + assertEquals(10, scheduler.getTotalRawCallVolume()); + assertEquals(30, scheduler.getTotalServiceUserCallVolume()); + assertEquals(30, scheduler.getTotalServiceUserRawCallVolume()); + assertEquals(1, scheduler.getPriorityLevel(newSchedulable(ugi1))); + assertEquals(0, scheduler.getPriorityLevel(newSchedulable(ugi2))); + + // test total costs after decay. + scheduler.forceDecay(); + assertEquals(5, scheduler.getTotalCallVolume()); + assertEquals(10, scheduler.getTotalRawCallVolume()); + assertEquals(15, scheduler.getTotalServiceUserCallVolume()); + assertEquals(30, scheduler.getTotalServiceUserRawCallVolume()); + assertEquals(1, scheduler.getPriorityLevel(newSchedulable(ugi1))); + assertEquals(0, scheduler.getPriorityLevel(newSchedulable(ugi2))); + } finally { stop(server, null); }