diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java index 7a17ced812e..b72ad164fdc 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java @@ -33,6 +33,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -212,6 +213,19 @@ public class CallQueueManager return scheduler.getPriorityLevel(e); } + int getPriorityLevel(UserGroupInformation user) { + if (scheduler instanceof DecayRpcScheduler) { + return ((DecayRpcScheduler)scheduler).getPriorityLevel(user); + } + return 0; + } + + void setPriorityLevel(UserGroupInformation user, int priority) { + if (scheduler instanceof DecayRpcScheduler) { + ((DecayRpcScheduler)scheduler).setPriorityLevel(user, priority); + } + } + void setClientBackoffEnabled(boolean value) { clientBackOffEnabled = value; } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java index 97baa65d878..5477c971c3a 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java @@ -40,6 +40,8 @@ import javax.management.ObjectName; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectWriter; + +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions; import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.AtomicDoubleArray; import org.apache.commons.lang3.exception.ExceptionUtils; @@ -193,6 +195,7 @@ public class DecayRpcScheduler implements RpcScheduler, private static final double PRECISION = 0.0001; private MetricsProxy metricsProxy; private final CostProvider costProvider; + private final Map staticPriorities = new HashMap<>(); private Set serviceUserNames; /** @@ -581,7 +584,10 @@ public class DecayRpcScheduler implements RpcScheduler, if (isServiceUser((String)identity)) { return 0; } - + Integer staticPriority = staticPriorities.get(identity); + if (staticPriority != null) { + return staticPriority.intValue(); + } long totalCallSnapshot = totalDecayedCallCost.get(); double proportion = 0; @@ -626,6 +632,15 @@ public class DecayRpcScheduler implements RpcScheduler, return priority; } + private String getIdentity(Schedulable obj) { + String identity = this.identityProvider.makeIdentity(obj); + if (identity == null) { + // Identity provider did not handle this + identity = DECAYSCHEDULER_UNKNOWN_IDENTITY; + } + return identity; + } + /** * Compute the appropriate priority for a schedulable based on past requests. * @param obj the schedulable obj to query and remember @@ -634,15 +649,42 @@ public class DecayRpcScheduler implements RpcScheduler, @Override public int getPriorityLevel(Schedulable obj) { // First get the identity - String identity = this.identityProvider.makeIdentity(obj); - if (identity == null) { - // Identity provider did not handle this - identity = DECAYSCHEDULER_UNKNOWN_IDENTITY; - } + String identity = getIdentity(obj); + // highest priority users may have a negative priority but their + // calls will be priority 0. + return Math.max(0, cachedOrComputedPriorityLevel(identity)); + } + @VisibleForTesting + int getPriorityLevel(UserGroupInformation ugi) { + String identity = getIdentity(newSchedulable(ugi)); + // returns true priority of the user. return cachedOrComputedPriorityLevel(identity); } + @VisibleForTesting + void setPriorityLevel(UserGroupInformation ugi, int priority) { + String identity = getIdentity(newSchedulable(ugi)); + priority = Math.min(numLevels - 1, priority); + LOG.info("Setting priority for user:" + identity + "=" + priority); + staticPriorities.put(identity, priority); + } + + // dummy instance to conform to identity provider api. + private static Schedulable newSchedulable(UserGroupInformation ugi) { + return new Schedulable() { + @Override + public UserGroupInformation getUserGroupInformation() { + return ugi; + } + + @Override + public int getPriorityLevel() { + return 0; + } + }; + } + private boolean isServiceUser(String userName) { return this.serviceUserNames.contains(userName); } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java index ad3628d0185..6169fef7f6d 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java @@ -51,6 +51,7 @@ import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.Rpc import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.SaslRpcServer; +import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.SecretManager; import org.apache.hadoop.security.token.TokenIdentifier; @@ -980,7 +981,18 @@ public class RPC { " ProtocolImpl=" + protocolImpl.getClass().getName() + " protocolClass=" + protocolClass.getName()); } - } + String client = SecurityUtil.getClientPrincipal(protocolClass, getConf()); + if (client != null) { + // notify the server's rpc scheduler that the protocol user has + // highest priority. the scheduler should exempt the user from + // priority calculations. + try { + setPriorityLevel(UserGroupInformation.createRemoteUser(client), -1); + } catch (Exception ex) { + LOG.warn("Failed to set scheduling priority for " + client, ex); + } + } + } static class VerProtocolImpl { final long version; diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index 3afad2135f0..cbee1234ec5 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -643,7 +643,22 @@ public abstract class Server { address.getPort(), e); } } - + + @org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting + int getPriorityLevel(Schedulable e) { + return callQueue.getPriorityLevel(e); + } + + @org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting + int getPriorityLevel(UserGroupInformation ugi) { + return callQueue.getPriorityLevel(ugi); + } + + @org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting + void setPriorityLevel(UserGroupInformation ugi, int priority) { + callQueue.setPriorityLevel(ugi, priority); + } + /** * Returns a handle to the rpcMetrics (required in tests) * @return rpc metrics diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/UserIdentityProvider.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/UserIdentityProvider.java index 763605e6a46..91ec1a259f1 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/UserIdentityProvider.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/UserIdentityProvider.java @@ -31,6 +31,6 @@ public class UserIdentityProvider implements IdentityProvider { return null; } - return ugi.getUserName(); + return ugi.getShortUserName(); } } \ No newline at end of file diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SecurityUtil.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SecurityUtil.java index 81f8143f118..3b9e9c53e44 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SecurityUtil.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SecurityUtil.java @@ -380,7 +380,25 @@ public final class SecurityUtil { } return null; } - + + /** + * Look up the client principal for a given protocol. It searches all known + * SecurityInfo providers. + * @param protocol the protocol class to get the information for + * @param conf configuration object + * @return client principal or null if it has no client principal defined. + */ + public static String getClientPrincipal(Class protocol, + Configuration conf) { + String user = null; + KerberosInfo krbInfo = SecurityUtil.getKerberosInfo(protocol, conf); + if (krbInfo != null) { + String key = krbInfo.clientPrincipal(); + user = (key != null && !key.isEmpty()) ? conf.get(key) : null; + } + return user; + } + /** * Look up the TokenInfo for a given protocol. It searches all known * SecurityInfo providers. diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/authorize/ServiceAuthorizationManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/authorize/ServiceAuthorizationManager.java index 27a6355b653..c83afc7fe4b 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/authorize/ServiceAuthorizationManager.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/authorize/ServiceAuthorizationManager.java @@ -28,7 +28,6 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; -import org.apache.hadoop.security.KerberosInfo; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.MachineList; @@ -101,21 +100,19 @@ public class ServiceAuthorizationManager { String clientPrincipal = null; if (UserGroupInformation.isSecurityEnabled()) { // get client principal key to verify (if available) - KerberosInfo krbInfo = SecurityUtil.getKerberosInfo(protocol, conf); - if (krbInfo != null) { - String clientKey = krbInfo.clientPrincipal(); - if (clientKey != null && !clientKey.isEmpty()) { - try { - clientPrincipal = SecurityUtil.getServerPrincipal( - conf.get(clientKey), addr); - } catch (IOException e) { - throw (AuthorizationException) new AuthorizationException( - "Can't figure out Kerberos principal name for connection from " - + addr + " for user=" + user + " protocol=" + protocol) - .initCause(e); - } + clientPrincipal = SecurityUtil.getClientPrincipal(protocol, conf); + try { + if (clientPrincipal != null) { + clientPrincipal = + SecurityUtil.getServerPrincipal(clientPrincipal, addr); } + } catch (IOException e) { + throw (AuthorizationException) new AuthorizationException( + "Can't figure out Kerberos principal name for connection from " + + addr + " for user=" + user + " protocol=" + protocol) + .initCause(e); } + } if((clientPrincipal != null && !clientPrincipal.equals(user.getUserName())) || acls.length != 2 || !acls[0].isUserAllowed(user) || acls[1].isUserAllowed(user)) { diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestDecayRpcScheduler.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestDecayRpcScheduler.java index a6531b21afc..fee43b83dae 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestDecayRpcScheduler.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestDecayRpcScheduler.java @@ -48,9 +48,8 @@ import java.util.concurrent.TimeUnit; public class TestDecayRpcScheduler { private Schedulable mockCall(String id) { Schedulable mockCall = mock(Schedulable.class); - UserGroupInformation ugi = mock(UserGroupInformation.class); + UserGroupInformation ugi = UserGroupInformation.createRemoteUser(id); - when(ugi.getUserName()).thenReturn(id); when(mockCall.getUserGroupInformation()).thenReturn(ugi); return mockCall; diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java index 628c044b1ed..9fbb865c6e5 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java @@ -1294,6 +1294,43 @@ public class TestRPC extends TestRpcBase { } } + @Test (timeout=30000) + public void testProtocolUserPriority() throws Exception { + final String ns = CommonConfigurationKeys.IPC_NAMESPACE + ".0"; + conf.set(CLIENT_PRINCIPAL_KEY, "clientForProtocol"); + Server server = null; + try { + server = setupDecayRpcSchedulerandTestServer(ns + "."); + + UserGroupInformation ugi = UserGroupInformation.createRemoteUser("user"); + // normal users start with priority 0. + Assert.assertEquals(0, server.getPriorityLevel(ugi)); + // calls for a protocol defined client will have priority of 0. + Assert.assertEquals(0, server.getPriorityLevel(newSchedulable(ugi))); + + // protocol defined client will have top priority of -1. + ugi = UserGroupInformation.createRemoteUser("clientForProtocol"); + Assert.assertEquals(-1, server.getPriorityLevel(ugi)); + // calls for a protocol defined client will have priority of 0. + Assert.assertEquals(0, server.getPriorityLevel(newSchedulable(ugi))); + } finally { + stop(server, null); + } + } + + private static Schedulable newSchedulable(UserGroupInformation ugi) { + return new Schedulable(){ + @Override + public UserGroupInformation getUserGroupInformation() { + return ugi; + } + @Override + public int getPriorityLevel() { + return 0; // doesn't matter. + } + }; + } + private Server setupDecayRpcSchedulerandTestServer(String ns) throws Exception { final int queueSizePerHandler = 3; diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcBase.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcBase.java index 010935b6096..0962b50099c 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcBase.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcBase.java @@ -62,6 +62,8 @@ public class TestRpcBase { protected final static String SERVER_PRINCIPAL_KEY = "test.ipc.server.principal"; + protected final static String CLIENT_PRINCIPAL_KEY = + "test.ipc.client.principal"; protected final static String ADDRESS = "0.0.0.0"; protected final static int PORT = 0; protected static InetSocketAddress addr; @@ -271,7 +273,8 @@ public class TestRpcBase { } } - @KerberosInfo(serverPrincipal = SERVER_PRINCIPAL_KEY) + @KerberosInfo(serverPrincipal = SERVER_PRINCIPAL_KEY, + clientPrincipal = CLIENT_PRINCIPAL_KEY) @TokenInfo(TestTokenSelector.class) @ProtocolInfo(protocolName = "org.apache.hadoop.ipc.TestRpcBase$TestRpcService", protocolVersion = 1)