HDFS-16296. RouterRpcFairnessPolicyController add rejected permits for each nameservice (#3613)

This commit is contained in:
Symious 2021-11-05 11:03:07 +08:00 committed by GitHub
parent 1032724aa3
commit 6f7b965808
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 57 additions and 0 deletions

View File

@ -120,4 +120,10 @@ public interface FederationRPCMBean {
* @return Number of operations rejected due to lack of permits. * @return Number of operations rejected due to lack of permits.
*/ */
long getProxyOpPermitRejected(); long getProxyOpPermitRejected();
/**
* Get the number of operations rejected due to lack of permits of each namespace.
* @return Number of operations rejected due to lack of permits of each namespace.
*/
String getProxyOpPermitRejectedPerNs();
} }

View File

@ -286,4 +286,9 @@ public void incrProxyOpPermitRejected() {
public long getProxyOpPermitRejected() { public long getProxyOpPermitRejected() {
return proxyOpPermitRejected.value(); return proxyOpPermitRejected.value();
} }
@Override
public String getProxyOpPermitRejectedPerNs() {
return rpcServer.getRPCClient().getRejectedPermitsPerNsJSON();
}
} }

View File

@ -47,6 +47,7 @@
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException; import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
@ -54,6 +55,7 @@
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
import java.util.regex.Matcher; import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;
@ -134,6 +136,7 @@ public class RouterRpcClient {
/** Fairness manager to control handlers assigned per NS. */ /** Fairness manager to control handlers assigned per NS. */
private RouterRpcFairnessPolicyController routerRpcFairnessPolicyController; private RouterRpcFairnessPolicyController routerRpcFairnessPolicyController;
private Map<String, LongAdder> rejectedPermitsPerNs = new ConcurrentHashMap<>();
/** /**
* Create a router RPC client to manage remote procedure calls to NNs. * Create a router RPC client to manage remote procedure calls to NNs.
@ -319,6 +322,15 @@ public String getAsyncCallerPoolJson() {
return JSON.toString(info); return JSON.toString(info);
} }
/**
* JSON representation of the rejected permits for each nameservice.
*
* @return String representation of the rejected permits for each nameservice.
*/
public String getRejectedPermitsPerNsJSON() {
return JSON.toString(rejectedPermitsPerNs);
}
/** /**
* Get ClientProtocol proxy client for a NameNode. Each combination of user + * Get ClientProtocol proxy client for a NameNode. Each combination of user +
* NN must use a unique proxy client. Previously created clients are cached * NN must use a unique proxy client. Previously created clients are cached
@ -1544,6 +1556,7 @@ private void acquirePermit(
if (rpcMonitor != null) { if (rpcMonitor != null) {
rpcMonitor.getRPCMetrics().incrProxyOpPermitRejected(); rpcMonitor.getRPCMetrics().incrProxyOpPermitRejected();
} }
incrRejectedPermitForNs(nsId);
LOG.debug("Permit denied for ugi: {} for method: {}", LOG.debug("Permit denied for ugi: {} for method: {}",
ugi, m.getMethodName()); ugi, m.getMethodName());
String msg = String msg =
@ -1576,4 +1589,13 @@ private void releasePermit(
return (AbstractRouterRpcFairnessPolicyController return (AbstractRouterRpcFairnessPolicyController
)routerRpcFairnessPolicyController; )routerRpcFairnessPolicyController;
} }
private void incrRejectedPermitForNs(String ns) {
rejectedPermitsPerNs.computeIfAbsent(ns, k -> new LongAdder()).increment();
}
public Long getRejectedPermitForNs(String ns) {
return rejectedPermitsPerNs.containsKey(ns) ?
rejectedPermitsPerNs.get(ns).longValue() : 0L;
}
} }

View File

@ -86,6 +86,8 @@
import org.apache.hadoop.hdfs.server.federation.resolver.NamenodeStatusReport; import org.apache.hadoop.hdfs.server.federation.resolver.NamenodeStatusReport;
import org.apache.hadoop.hdfs.server.federation.router.Router; import org.apache.hadoop.hdfs.server.federation.router.Router;
import org.apache.hadoop.hdfs.server.federation.router.RouterClient; import org.apache.hadoop.hdfs.server.federation.router.RouterClient;
import org.apache.hadoop.hdfs.server.federation.router.RouterRpcClient;
import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer;
import org.apache.hadoop.hdfs.server.namenode.FSImage; import org.apache.hadoop.hdfs.server.namenode.FSImage;
import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider; import org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider;
@ -267,6 +269,14 @@ public DFSClient getClient() throws IOException, URISyntaxException {
public Configuration getConf() { public Configuration getConf() {
return conf; return conf;
} }
public RouterRpcServer getRouterRpcServer() {
return router.getRpcServer();
}
public RouterRpcClient getRouterRpcClient() {
return getRouterRpcServer().getRPCClient();
}
} }
/** /**

View File

@ -146,6 +146,7 @@ private void startLoadTest(final boolean isConcurrent, final boolean fairness)
Configuration conf = new HdfsConfiguration(); Configuration conf = new HdfsConfiguration();
final int numOps = 10; final int numOps = 10;
final AtomicInteger overloadException = new AtomicInteger(); final AtomicInteger overloadException = new AtomicInteger();
int originalRejectedPermits = getTotalRejectedPermits(routerContext);
for (int i = 0; i < numOps; i++) { for (int i = 0; i < numOps; i++) {
DFSClient routerClient = null; DFSClient routerClient = null;
@ -177,6 +178,9 @@ private void startLoadTest(final boolean isConcurrent, final boolean fairness)
} }
overloadException.get(); overloadException.get();
} }
int latestRejectedPermits = getTotalRejectedPermits(routerContext);
assertEquals(latestRejectedPermits - originalRejectedPermits,
overloadException.get());
if (fairness) { if (fairness) {
assertTrue(overloadException.get() > 0); assertTrue(overloadException.get() > 0);
@ -208,4 +212,14 @@ private void invokeConcurrent(ClientProtocol routerProto, String clientName)
routerProto.renewLease(clientName); routerProto.renewLease(clientName);
} }
private int getTotalRejectedPermits(RouterContext routerContext) {
int totalRejectedPermits = 0;
for (String ns : cluster.getNameservices()) {
totalRejectedPermits += routerContext.getRouterRpcClient()
.getRejectedPermitForNs(ns);
}
totalRejectedPermits += routerContext.getRouterRpcClient()
.getRejectedPermitForNs(RouterRpcFairnessConstants.CONCURRENT_NS);
return totalRejectedPermits;
}
} }