From 56ecf469f82feead169a77e1f3b6f3eabae1d509 Mon Sep 17 00:00:00 2001 From: Eric Payne Date: Mon, 23 Nov 2020 20:48:07 +0000 Subject: [PATCH] HADOOP-17346. Fair call queue is defeated by abusive service principals. Contributed by Ahmed Hussein (ahussein). --- .../apache/hadoop/ipc/CallQueueManager.java | 14 +++++ .../apache/hadoop/ipc/DecayRpcScheduler.java | 59 ++++++++++++++++--- .../main/java/org/apache/hadoop/ipc/RPC.java | 14 ++++- .../java/org/apache/hadoop/ipc/Server.java | 17 +++++- .../hadoop/ipc/UserIdentityProvider.java | 2 +- .../apache/hadoop/security/SecurityUtil.java | 20 ++++++- .../ServiceAuthorizationManager.java | 29 ++++----- .../hadoop/ipc/TestDecayRpcScheduler.java | 3 +- .../java/org/apache/hadoop/ipc/TestRPC.java | 37 ++++++++++++ .../org/apache/hadoop/ipc/TestRpcBase.java | 5 +- 10 files changed, 171 insertions(+), 29 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java index 02876561b51..692f1a82b35 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java @@ -33,6 +33,7 @@ import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto; import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.security.UserGroupInformation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -202,6 +203,19 @@ public class CallQueueManager return scheduler.getPriorityLevel(e); } + int getPriorityLevel(UserGroupInformation user) { + if (scheduler instanceof DecayRpcScheduler) { + return ((DecayRpcScheduler)scheduler).getPriorityLevel(user); + } + return 0; + } + + void setPriorityLevel(UserGroupInformation user, int priority) { + if (scheduler instanceof DecayRpcScheduler) { + ((DecayRpcScheduler)scheduler).setPriorityLevel(user, priority); + } + } + void setClientBackoffEnabled(boolean value) { clientBackOffEnabled = value; } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java index c1ca419c29c..d1108a99356 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java @@ -25,6 +25,7 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.ConcurrentHashMap; @@ -39,6 +40,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectWriter; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.AtomicDoubleArray; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; @@ -173,6 +175,7 @@ public class DecayRpcScheduler implements RpcScheduler, private MetricsProxy metricsProxy; private final CostProvider costProvider; + private final Map staticPriorities = new HashMap<>(); /** * This TimerTask will call decayCurrentCosts until * the scheduler has been garbage collected. @@ -476,7 +479,7 @@ public class DecayRpcScheduler implements RpcScheduler, AtomicLong value = entry.getValue().get(0); long snapshot = value.get(); - int computedLevel = computePriorityLevel(snapshot); + int computedLevel = computePriorityLevel(snapshot, id); nextCache.put(id, computedLevel); } @@ -526,7 +529,11 @@ public class DecayRpcScheduler implements RpcScheduler, * @param cost the cost for an identity * @return scheduling decision from 0 to numLevels - 1 */ - private int computePriorityLevel(long cost) { + private int computePriorityLevel(long cost, Object identity) { + Integer staticPriority = staticPriorities.get(identity); + if (staticPriority != null) { + return staticPriority.intValue(); + } long totalCallSnapshot = totalDecayedCallCost.get(); double proportion = 0; @@ -566,11 +573,20 @@ public class DecayRpcScheduler implements RpcScheduler, // Cache was no good, compute it List costList = callCosts.get(identity); long currentCost = costList == null ? 0 : costList.get(0).get(); - int priority = computePriorityLevel(currentCost); + int priority = computePriorityLevel(currentCost, identity); LOG.debug("compute priority for {} priority {}", identity, priority); return priority; } + private String getIdentity(Schedulable obj) { + String identity = this.identityProvider.makeIdentity(obj); + if (identity == null) { + // Identity provider did not handle this + identity = DECAYSCHEDULER_UNKNOWN_IDENTITY; + } + return identity; + } + /** * Compute the appropriate priority for a schedulable based on past requests. * @param obj the schedulable obj to query and remember @@ -579,15 +595,42 @@ public class DecayRpcScheduler implements RpcScheduler, @Override public int getPriorityLevel(Schedulable obj) { // First get the identity - String identity = this.identityProvider.makeIdentity(obj); - if (identity == null) { - // Identity provider did not handle this - identity = DECAYSCHEDULER_UNKNOWN_IDENTITY; - } + String identity = getIdentity(obj); + // highest priority users may have a negative priority but their + // calls will be priority 0. + return Math.max(0, cachedOrComputedPriorityLevel(identity)); + } + @VisibleForTesting + int getPriorityLevel(UserGroupInformation ugi) { + String identity = getIdentity(newSchedulable(ugi)); + // returns true priority of the user. return cachedOrComputedPriorityLevel(identity); } + @VisibleForTesting + void setPriorityLevel(UserGroupInformation ugi, int priority) { + String identity = getIdentity(newSchedulable(ugi)); + priority = Math.min(numLevels - 1, priority); + LOG.info("Setting priority for user:" + identity + "=" + priority); + staticPriorities.put(identity, priority); + } + + // dummy instance to conform to identity provider api. + private static Schedulable newSchedulable(UserGroupInformation ugi) { + return new Schedulable() { + @Override + public UserGroupInformation getUserGroupInformation() { + return ugi; + } + + @Override + public int getPriorityLevel() { + return 0; + } + }; + } + @Override public boolean shouldBackOff(Schedulable obj) { Boolean backOff = false; diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java index d8929972f00..efb3258fec1 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java @@ -51,6 +51,7 @@ import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.Rpc import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.SaslRpcServer; +import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.SecretManager; import org.apache.hadoop.security.token.TokenIdentifier; @@ -980,7 +981,18 @@ public class RPC { " ProtocolImpl=" + protocolImpl.getClass().getName() + " protocolClass=" + protocolClass.getName()); } - } + String client = SecurityUtil.getClientPrincipal(protocolClass, getConf()); + if (client != null) { + // notify the server's rpc scheduler that the protocol user has + // highest priority. the scheduler should exempt the user from + // priority calculations. + try { + setPriorityLevel(UserGroupInformation.createRemoteUser(client), -1); + } catch (Exception ex) { + LOG.warn("Failed to set scheduling priority for " + client, ex); + } + } + } static class VerProtocolImpl { final long version; diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index d06736a3294..0178139e864 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -637,7 +637,22 @@ public abstract class Server { address.getPort(), e); } } - + + @VisibleForTesting + int getPriorityLevel(Schedulable e) { + return callQueue.getPriorityLevel(e); + } + + @VisibleForTesting + int getPriorityLevel(UserGroupInformation ugi) { + return callQueue.getPriorityLevel(ugi); + } + + @VisibleForTesting + void setPriorityLevel(UserGroupInformation ugi, int priority) { + callQueue.setPriorityLevel(ugi, priority); + } + /** * Returns a handle to the rpcMetrics (required in tests) * @return rpc metrics diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/UserIdentityProvider.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/UserIdentityProvider.java index 763605e6a46..91ec1a259f1 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/UserIdentityProvider.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/UserIdentityProvider.java @@ -31,6 +31,6 @@ public class UserIdentityProvider implements IdentityProvider { return null; } - return ugi.getUserName(); + return ugi.getShortUserName(); } } \ No newline at end of file diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SecurityUtil.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SecurityUtil.java index 2313119bfec..dbf1328a17e 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SecurityUtil.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SecurityUtil.java @@ -379,7 +379,25 @@ public final class SecurityUtil { } return null; } - + + /** + * Look up the client principal for a given protocol. It searches all known + * SecurityInfo providers. + * @param protocol the protocol class to get the information for + * @param conf configuration object + * @return client principal or null if it has no client principal defined. + */ + public static String getClientPrincipal(Class protocol, + Configuration conf) { + String user = null; + KerberosInfo krbInfo = SecurityUtil.getKerberosInfo(protocol, conf); + if (krbInfo != null) { + String key = krbInfo.clientPrincipal(); + user = (key != null && !key.isEmpty()) ? conf.get(key) : null; + } + return user; + } + /** * Look up the TokenInfo for a given protocol. It searches all known * SecurityInfo providers. diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/authorize/ServiceAuthorizationManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/authorize/ServiceAuthorizationManager.java index 4c47348fa55..71c8be37088 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/authorize/ServiceAuthorizationManager.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/authorize/ServiceAuthorizationManager.java @@ -99,22 +99,23 @@ public class ServiceAuthorizationManager { } // get client principal key to verify (if available) - KerberosInfo krbInfo = SecurityUtil.getKerberosInfo(protocol, conf); - String clientPrincipal = null; - if (krbInfo != null) { - String clientKey = krbInfo.clientPrincipal(); - if (clientKey != null && !clientKey.isEmpty()) { - try { - clientPrincipal = SecurityUtil.getServerPrincipal( - conf.get(clientKey), addr); - } catch (IOException e) { - throw (AuthorizationException) new AuthorizationException( - "Can't figure out Kerberos principal name for connection from " - + addr + " for user=" + user + " protocol=" + protocol) - .initCause(e); - } + String clientPrincipal = null; + + + try { + clientPrincipal = SecurityUtil.getClientPrincipal(protocol, conf); + if (clientPrincipal != null) { + + clientPrincipal = SecurityUtil.getServerPrincipal(clientPrincipal, + addr); } + } catch (IOException e) { + throw (AuthorizationException) new AuthorizationException( + "Can't figure out Kerberos principal name for connection from " + + addr + " for user=" + user + " protocol=" + protocol) + .initCause(e); } + if((clientPrincipal != null && !clientPrincipal.equals(user.getUserName())) || acls.length != 2 || !acls[0].isUserAllowed(user) || acls[1].isUserAllowed(user)) { String cause = clientPrincipal != null ? diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestDecayRpcScheduler.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestDecayRpcScheduler.java index 7bdc6b5e96d..ca052f0c7f8 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestDecayRpcScheduler.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestDecayRpcScheduler.java @@ -42,9 +42,8 @@ import java.util.concurrent.TimeUnit; public class TestDecayRpcScheduler { private Schedulable mockCall(String id) { Schedulable mockCall = mock(Schedulable.class); - UserGroupInformation ugi = mock(UserGroupInformation.class); + UserGroupInformation ugi = UserGroupInformation.createRemoteUser(id); - when(ugi.getUserName()).thenReturn(id); when(mockCall.getUserGroupInformation()).thenReturn(ugi); return mockCall; diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java index f0d9baf33cb..40e799545a9 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java @@ -1293,6 +1293,43 @@ public class TestRPC extends TestRpcBase { } } + @Test (timeout=30000) + public void testProtocolUserPriority() throws Exception { + final String ns = CommonConfigurationKeys.IPC_NAMESPACE + ".0"; + conf.set(CLIENT_PRINCIPAL_KEY, "clientForProtocol"); + Server server = null; + try { + server = setupDecayRpcSchedulerandTestServer(ns + "."); + + UserGroupInformation ugi = UserGroupInformation.createRemoteUser("user"); + // normal users start with priority 0. + Assert.assertEquals(0, server.getPriorityLevel(ugi)); + // calls for a protocol defined client will have priority of 0. + Assert.assertEquals(0, server.getPriorityLevel(newSchedulable(ugi))); + + // protocol defined client will have top priority of -1. + ugi = UserGroupInformation.createRemoteUser("clientForProtocol"); + Assert.assertEquals(-1, server.getPriorityLevel(ugi)); + // calls for a protocol defined client will have priority of 0. + Assert.assertEquals(0, server.getPriorityLevel(newSchedulable(ugi))); + } finally { + stop(server, null); + } + } + + private static Schedulable newSchedulable(UserGroupInformation ugi) { + return new Schedulable(){ + @Override + public UserGroupInformation getUserGroupInformation() { + return ugi; + } + @Override + public int getPriorityLevel() { + return 0; // doesn't matter. + } + }; + } + private Server setupDecayRpcSchedulerandTestServer(String ns) throws Exception { final int queueSizePerHandler = 3; diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcBase.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcBase.java index 2f2d36f7b45..22ea918583d 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcBase.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcBase.java @@ -62,6 +62,8 @@ public class TestRpcBase { protected final static String SERVER_PRINCIPAL_KEY = "test.ipc.server.principal"; + protected final static String CLIENT_PRINCIPAL_KEY = + "test.ipc.client.principal"; protected final static String ADDRESS = "0.0.0.0"; protected final static int PORT = 0; protected static InetSocketAddress addr; @@ -271,7 +273,8 @@ public class TestRpcBase { } } - @KerberosInfo(serverPrincipal = SERVER_PRINCIPAL_KEY) + @KerberosInfo(serverPrincipal = SERVER_PRINCIPAL_KEY, + clientPrincipal = CLIENT_PRINCIPAL_KEY) @TokenInfo(TestTokenSelector.class) @ProtocolInfo(protocolName = "org.apache.hadoop.ipc.TestRpcBase$TestRpcService", protocolVersion = 1)