HADOOP-18692. User in staticPriorities cost also shouldn't be accumulated to totalDecayedCallCost and totalRawCallCost.

This commit is contained in:
zhengchenyu 2023-04-07 11:00:48 +08:00
parent e45451f9c7
commit 1f612f728b
3 changed files with 63 additions and 19 deletions

View File

@ -23,7 +23,6 @@ import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -197,7 +196,6 @@ public class DecayRpcScheduler implements RpcScheduler,
private MetricsProxy metricsProxy; private MetricsProxy metricsProxy;
private final CostProvider costProvider; private final CostProvider costProvider;
private final Map<String, Integer> staticPriorities = new HashMap<>(); private final Map<String, Integer> staticPriorities = new HashMap<>();
private Set<String> serviceUserNames;
/** /**
* This TimerTask will call decayCurrentCosts until * This TimerTask will call decayCurrentCosts until
@ -249,7 +247,7 @@ public class DecayRpcScheduler implements RpcScheduler,
conf); conf);
this.backOffResponseTimeThresholds = this.backOffResponseTimeThresholds =
parseBackOffResponseTimeThreshold(ns, conf, numLevels); parseBackOffResponseTimeThreshold(ns, conf, numLevels);
this.serviceUserNames = this.parseServiceUserNames(ns, conf); this.parseServiceUserNames(ns, conf);
// Setup response time metrics // Setup response time metrics
responseTimeTotalInCurrWindow = new AtomicLongArray(numLevels); responseTimeTotalInCurrWindow = new AtomicLongArray(numLevels);
@ -406,10 +404,12 @@ public class DecayRpcScheduler implements RpcScheduler,
return decimals; return decimals;
} }
private Set<String> parseServiceUserNames(String ns, Configuration conf) { private void parseServiceUserNames(String ns, Configuration conf) {
Collection<String> collection = conf.getStringCollection( Collection<String> collection = conf.getStringCollection(
ns + "." + IPC_DECAYSCHEDULER_SERVICE_USERS_KEY); ns + "." + IPC_DECAYSCHEDULER_SERVICE_USERS_KEY);
return new HashSet<>(collection); for (String user : collection) {
staticPriorities.put(user, -1);
}
} }
/** /**
@ -607,10 +607,6 @@ public class DecayRpcScheduler implements RpcScheduler,
* @return scheduling decision from 0 to numLevels - 1 * @return scheduling decision from 0 to numLevels - 1
*/ */
private int computePriorityLevel(long cost, Object identity) { private int computePriorityLevel(long cost, Object identity) {
// The priority for service users is always 0
if (isServiceUser((String)identity)) {
return 0;
}
Integer staticPriority = staticPriorities.get(identity); Integer staticPriority = staticPriorities.get(identity);
if (staticPriority != null) { if (staticPriority != null) {
return staticPriority.intValue(); return staticPriority.intValue();
@ -714,7 +710,7 @@ public class DecayRpcScheduler implements RpcScheduler,
} }
private boolean isServiceUser(String userName) { private boolean isServiceUser(String userName) {
return this.serviceUserNames.contains(userName); return this.staticPriorities.getOrDefault(userName, 0) < 0;
} }
@Override @Override
@ -816,7 +812,7 @@ public class DecayRpcScheduler implements RpcScheduler,
@VisibleForTesting @VisibleForTesting
Set<String> getServiceUserNames() { Set<String> getServiceUserNames() {
return serviceUserNames; return staticPriorities.keySet();
} }
@VisibleForTesting @VisibleForTesting

View File

@ -482,8 +482,8 @@ public class TestDecayRpcScheduler {
String summary = scheduler.getSchedulingDecisionSummary(); String summary = scheduler.getSchedulingDecisionSummary();
Map<String, Object> summaryMap = (Map<String, Object>) JSON.parse(summary); Map<String, Object> summaryMap = (Map<String, Object>) JSON.parse(summary);
assertNotEquals(0L, summaryMap.get("user1")); assertNotEquals(0L, summaryMap.get("user1"));
assertEquals(0L, summaryMap.get("service1")); assertEquals(-1L, summaryMap.get("service1"));
assertEquals(0L, summaryMap.get("service2")); assertEquals(-1L, summaryMap.get("service2"));
} }
/** /**

View File

@ -22,6 +22,7 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.io.retry.RetryUtils; import org.apache.hadoop.io.retry.RetryUtils;
import org.apache.hadoop.ipc.metrics.RpcMetrics; import org.apache.hadoop.ipc.metrics.RpcMetrics;
import org.apache.hadoop.test.Whitebox;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.thirdparty.protobuf.ServiceException; import org.apache.hadoop.thirdparty.protobuf.ServiceException;
import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.HadoopIllegalArgumentException;
@ -1583,17 +1584,64 @@ public class TestRPC extends TestRpcBase {
try { try {
server = setupDecayRpcSchedulerandTestServer(ns + "."); server = setupDecayRpcSchedulerandTestServer(ns + ".");
UserGroupInformation ugi = UserGroupInformation.createRemoteUser("user"); UserGroupInformation ugi1 = UserGroupInformation.createRemoteUser("user");
// normal users start with priority 0. // normal users start with priority 0.
Assert.assertEquals(0, server.getPriorityLevel(ugi)); Assert.assertEquals(0, server.getPriorityLevel(ugi1));
// calls for a protocol defined client will have priority of 0. // calls for a protocol defined client will have priority of 0.
Assert.assertEquals(0, server.getPriorityLevel(newSchedulable(ugi))); Assert.assertEquals(0, server.getPriorityLevel(newSchedulable(ugi1)));
// protocol defined client will have top priority of -1. // protocol defined client will have top priority of -1.
ugi = UserGroupInformation.createRemoteUser("clientForProtocol"); UserGroupInformation ugi2 = UserGroupInformation.createRemoteUser("clientForProtocol");
Assert.assertEquals(-1, server.getPriorityLevel(ugi)); Assert.assertEquals(-1, server.getPriorityLevel(ugi2));
// calls for a protocol defined client will have priority of 0. // calls for a protocol defined client will have priority of 0.
Assert.assertEquals(0, server.getPriorityLevel(newSchedulable(ugi))); Assert.assertEquals(0, server.getPriorityLevel(newSchedulable(ugi2)));
// user call
ugi1.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
TestRpcService proxy = getClient(addr, conf);
for (int i = 0; i < 10; i++) {
proxy.ping(null, newEmptyRequest());
}
return null;
}
});
// clientForProtocol call
ugi2.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
TestRpcService proxy = getClient(addr, conf);
for (int i = 0; i < 30; i++) {
proxy.ping(null, newEmptyRequest());
}
return null;
}
});
CallQueueManager callQueueManager =
(CallQueueManager) Whitebox.getInternalState(server, "callQueue");
DecayRpcScheduler scheduler =
(DecayRpcScheduler) Whitebox.getInternalState(callQueueManager, "scheduler");
Assert.assertNotNull(scheduler);
// test total costs.
assertEquals(10, scheduler.getTotalCallVolume());
assertEquals(10, scheduler.getTotalRawCallVolume());
assertEquals(30, scheduler.getTotalServiceUserCallVolume());
assertEquals(30, scheduler.getTotalServiceUserRawCallVolume());
assertEquals(1, scheduler.getPriorityLevel(newSchedulable(ugi1)));
assertEquals(0, scheduler.getPriorityLevel(newSchedulable(ugi2)));
// test total costs after decay.
scheduler.forceDecay();
assertEquals(5, scheduler.getTotalCallVolume());
assertEquals(10, scheduler.getTotalRawCallVolume());
assertEquals(15, scheduler.getTotalServiceUserCallVolume());
assertEquals(30, scheduler.getTotalServiceUserRawCallVolume());
assertEquals(1, scheduler.getPriorityLevel(newSchedulable(ugi1)));
assertEquals(0, scheduler.getPriorityLevel(newSchedulable(ugi2)));
} finally { } finally {
stop(server, null); stop(server, null);
} }