HADOOP-17346. Fair call queue is defeated by abusive service principals. Contributed by Ahmed Hussein (ahussein).
This commit is contained in:
parent
6062978768
commit
c7845dc574
|
@ -33,6 +33,7 @@ import org.apache.hadoop.fs.CommonConfigurationKeys;
|
|||
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -201,6 +202,19 @@ public class CallQueueManager<E extends Schedulable>
|
|||
return scheduler.getPriorityLevel(e);
|
||||
}
|
||||
|
||||
int getPriorityLevel(UserGroupInformation user) {
|
||||
if (scheduler instanceof DecayRpcScheduler) {
|
||||
return ((DecayRpcScheduler)scheduler).getPriorityLevel(user);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
void setPriorityLevel(UserGroupInformation user, int priority) {
|
||||
if (scheduler instanceof DecayRpcScheduler) {
|
||||
((DecayRpcScheduler)scheduler).setPriorityLevel(user, priority);
|
||||
}
|
||||
}
|
||||
|
||||
void setClientBackoffEnabled(boolean value) {
|
||||
clientBackOffEnabled = value;
|
||||
}
|
||||
|
|
|
@ -51,6 +51,7 @@ import org.apache.hadoop.metrics2.lib.Interns;
|
|||
import org.apache.hadoop.metrics2.util.MBeans;
|
||||
import org.apache.hadoop.metrics2.util.Metrics2Util.NameValuePair;
|
||||
import org.apache.hadoop.metrics2.util.Metrics2Util.TopN;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import org.slf4j.Logger;
|
||||
|
@ -172,7 +173,7 @@ public class DecayRpcScheduler implements RpcScheduler,
|
|||
private static final double PRECISION = 0.0001;
|
||||
private MetricsProxy metricsProxy;
|
||||
private final CostProvider costProvider;
|
||||
|
||||
private final Map<String,Integer> staticPriorities = new HashMap<>();
|
||||
/**
|
||||
* This TimerTask will call decayCurrentCosts until
|
||||
* the scheduler has been garbage collected.
|
||||
|
@ -468,7 +469,7 @@ public class DecayRpcScheduler implements RpcScheduler,
|
|||
AtomicLong value = entry.getValue().get(0);
|
||||
|
||||
long snapshot = value.get();
|
||||
int computedLevel = computePriorityLevel(snapshot);
|
||||
int computedLevel = computePriorityLevel(snapshot, id);
|
||||
|
||||
nextCache.put(id, computedLevel);
|
||||
}
|
||||
|
@ -515,10 +516,15 @@ public class DecayRpcScheduler implements RpcScheduler,
|
|||
/**
|
||||
* Given the cost for an identity, compute a scheduling decision.
|
||||
*
|
||||
* @param identity to compute a cost
|
||||
* @param cost the cost for an identity
|
||||
* @return scheduling decision from 0 to numLevels - 1
|
||||
*/
|
||||
private int computePriorityLevel(long cost) {
|
||||
private int computePriorityLevel(long cost, Object identity) {
|
||||
Integer staticPriority = staticPriorities.get(identity);
|
||||
if (staticPriority != null) {
|
||||
return staticPriority.intValue();
|
||||
}
|
||||
long totalCallSnapshot = totalDecayedCallCost.get();
|
||||
|
||||
double proportion = 0;
|
||||
|
@ -558,11 +564,20 @@ public class DecayRpcScheduler implements RpcScheduler,
|
|||
// Cache was no good, compute it
|
||||
List<AtomicLong> costList = callCosts.get(identity);
|
||||
long currentCost = costList == null ? 0 : costList.get(0).get();
|
||||
int priority = computePriorityLevel(currentCost);
|
||||
int priority = computePriorityLevel(currentCost, identity);
|
||||
LOG.debug("compute priority for {} priority {}", identity, priority);
|
||||
return priority;
|
||||
}
|
||||
|
||||
private String getIdentity(Schedulable obj) {
|
||||
String identity = this.identityProvider.makeIdentity(obj);
|
||||
if (identity == null) {
|
||||
// Identity provider did not handle this
|
||||
identity = DECAYSCHEDULER_UNKNOWN_IDENTITY;
|
||||
}
|
||||
return identity;
|
||||
}
|
||||
|
||||
/**
|
||||
* Compute the appropriate priority for a schedulable based on past requests.
|
||||
* @param obj the schedulable obj to query and remember
|
||||
|
@ -571,15 +586,41 @@ public class DecayRpcScheduler implements RpcScheduler,
|
|||
@Override
|
||||
public int getPriorityLevel(Schedulable obj) {
|
||||
// First get the identity
|
||||
String identity = this.identityProvider.makeIdentity(obj);
|
||||
if (identity == null) {
|
||||
// Identity provider did not handle this
|
||||
identity = DECAYSCHEDULER_UNKNOWN_IDENTITY;
|
||||
String identity = getIdentity(obj);
|
||||
// highest priority users may have a negative priority but their
|
||||
// calls will be priority 0.
|
||||
return Math.max(0, cachedOrComputedPriorityLevel(identity));
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
int getPriorityLevel(UserGroupInformation ugi) {
|
||||
String identity = getIdentity(newSchedulable(ugi));
|
||||
// returns true priority of the user.
|
||||
return cachedOrComputedPriorityLevel(identity);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
void setPriorityLevel(UserGroupInformation ugi, int priority) {
|
||||
String identity = getIdentity(newSchedulable(ugi));
|
||||
priority = Math.min(numLevels - 1, priority);
|
||||
LOG.info("Setting priority for user:" + identity + "=" + priority);
|
||||
staticPriorities.put(identity, priority);
|
||||
}
|
||||
|
||||
// dummy instance to conform to identity provider api.
|
||||
private static Schedulable newSchedulable(UserGroupInformation ugi) {
|
||||
return new Schedulable() {
|
||||
@Override
|
||||
public UserGroupInformation getUserGroupInformation() {
|
||||
return ugi;
|
||||
}
|
||||
@Override
|
||||
public int getPriorityLevel() {
|
||||
return 0;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean shouldBackOff(Schedulable obj) {
|
||||
Boolean backOff = false;
|
||||
|
|
|
@ -49,6 +49,7 @@ import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.Rpc
|
|||
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.security.SaslRpcServer;
|
||||
import org.apache.hadoop.security.SecurityUtil;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.SecretManager;
|
||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||
|
@ -946,6 +947,17 @@ public class RPC {
|
|||
" ProtocolImpl=" + protocolImpl.getClass().getName() +
|
||||
" protocolClass=" + protocolClass.getName());
|
||||
}
|
||||
String client = SecurityUtil.getClientPrincipal(protocolClass, getConf());
|
||||
if (client != null) {
|
||||
// notify the server's rpc scheduler that the protocol user has
|
||||
// highest priority. the scheduler should exempt the user from
|
||||
// priority calculations.
|
||||
try {
|
||||
setPriorityLevel(UserGroupInformation.createRemoteUser(client), -1);
|
||||
} catch (Exception ex) {
|
||||
LOG.warn("Failed to set scheduling priority for " + client, ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static class VerProtocolImpl {
|
||||
|
|
|
@ -639,6 +639,21 @@ public abstract class Server {
|
|||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
int getPriorityLevel(Schedulable e) {
|
||||
return callQueue.getPriorityLevel(e);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
int getPriorityLevel(UserGroupInformation ugi) {
|
||||
return callQueue.getPriorityLevel(ugi);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
void setPriorityLevel(UserGroupInformation ugi, int priority) {
|
||||
callQueue.setPriorityLevel(ugi, priority);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a handle to the rpcMetrics (required in tests)
|
||||
* @return rpc metrics
|
||||
|
|
|
@ -31,6 +31,6 @@ public class UserIdentityProvider implements IdentityProvider {
|
|||
return null;
|
||||
}
|
||||
|
||||
return ugi.getUserName();
|
||||
return ugi.getShortUserName();
|
||||
}
|
||||
}
|
|
@ -378,6 +378,24 @@ public final class SecurityUtil {
|
|||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Look up the client principal for a given protocol. It searches all known
|
||||
* SecurityInfo providers.
|
||||
* @param protocol the protocol class to get the information for
|
||||
* @param conf configuration object
|
||||
* @return client principal or null if it has no client principal defined.
|
||||
*/
|
||||
public static String getClientPrincipal(Class<?> protocol,
|
||||
Configuration conf) {
|
||||
String user = null;
|
||||
KerberosInfo krbInfo = SecurityUtil.getKerberosInfo(protocol, conf);
|
||||
if (krbInfo != null) {
|
||||
String key = krbInfo.clientPrincipal();
|
||||
user = (key != null && !key.isEmpty()) ? conf.get(key) : null;
|
||||
}
|
||||
return user;
|
||||
}
|
||||
|
||||
/**
|
||||
* Look up the TokenInfo for a given protocol. It searches all known
|
||||
* SecurityInfo providers.
|
||||
|
|
|
@ -99,22 +99,23 @@ public class ServiceAuthorizationManager {
|
|||
}
|
||||
|
||||
// get client principal key to verify (if available)
|
||||
KerberosInfo krbInfo = SecurityUtil.getKerberosInfo(protocol, conf);
|
||||
String clientPrincipal = null;
|
||||
if (krbInfo != null) {
|
||||
String clientKey = krbInfo.clientPrincipal();
|
||||
if (clientKey != null && !clientKey.isEmpty()) {
|
||||
|
||||
|
||||
try {
|
||||
clientPrincipal = SecurityUtil.getServerPrincipal(
|
||||
conf.get(clientKey), addr);
|
||||
clientPrincipal = SecurityUtil.getClientPrincipal(protocol, conf);
|
||||
if (clientPrincipal != null) {
|
||||
|
||||
clientPrincipal = SecurityUtil.getServerPrincipal(clientPrincipal,
|
||||
addr);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
throw (AuthorizationException) new AuthorizationException(
|
||||
"Can't figure out Kerberos principal name for connection from "
|
||||
+ addr + " for user=" + user + " protocol=" + protocol)
|
||||
.initCause(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if((clientPrincipal != null && !clientPrincipal.equals(user.getUserName())) ||
|
||||
acls.length != 2 || !acls[0].isUserAllowed(user) || acls[1].isUserAllowed(user)) {
|
||||
String cause = clientPrincipal != null ?
|
||||
|
|
|
@ -42,9 +42,8 @@ import java.util.concurrent.TimeUnit;
|
|||
public class TestDecayRpcScheduler {
|
||||
private Schedulable mockCall(String id) {
|
||||
Schedulable mockCall = mock(Schedulable.class);
|
||||
UserGroupInformation ugi = mock(UserGroupInformation.class);
|
||||
UserGroupInformation ugi = UserGroupInformation.createRemoteUser(id);
|
||||
|
||||
when(ugi.getUserName()).thenReturn(id);
|
||||
when(mockCall.getUserGroupInformation()).thenReturn(ugi);
|
||||
|
||||
return mockCall;
|
||||
|
|
|
@ -48,6 +48,8 @@ import org.apache.hadoop.security.token.TokenIdentifier;
|
|||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.hadoop.test.MetricsAsserts;
|
||||
import org.apache.hadoop.test.MockitoUtil;
|
||||
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.mockito.internal.util.reflection.Whitebox;
|
||||
|
@ -1285,6 +1287,43 @@ public class TestRPC extends TestRpcBase {
|
|||
}
|
||||
}
|
||||
|
||||
@Test (timeout=30000)
|
||||
public void testProtocolUserPriority() throws Exception {
|
||||
final String ns = CommonConfigurationKeys.IPC_NAMESPACE + ".0";
|
||||
conf.set(CLIENT_PRINCIPAL_KEY, "clientForProtocol");
|
||||
Server server = null;
|
||||
try {
|
||||
server = setupDecayRpcSchedulerandTestServer(ns + ".");
|
||||
|
||||
UserGroupInformation ugi = UserGroupInformation.createRemoteUser("user");
|
||||
// normal users start with priority 0.
|
||||
Assert.assertEquals(0, server.getPriorityLevel(ugi));
|
||||
// calls for a protocol defined client will have priority of 0.
|
||||
Assert.assertEquals(0, server.getPriorityLevel(newSchedulable(ugi)));
|
||||
|
||||
// protocol defined client will have top priority of -1.
|
||||
ugi = UserGroupInformation.createRemoteUser("clientForProtocol");
|
||||
Assert.assertEquals(-1, server.getPriorityLevel(ugi));
|
||||
// calls for a protocol defined client will have priority of 0.
|
||||
Assert.assertEquals(0, server.getPriorityLevel(newSchedulable(ugi)));
|
||||
} finally {
|
||||
stop(server, null);
|
||||
}
|
||||
}
|
||||
|
||||
private static Schedulable newSchedulable(UserGroupInformation ugi) {
|
||||
return new Schedulable(){
|
||||
@Override
|
||||
public UserGroupInformation getUserGroupInformation() {
|
||||
return ugi;
|
||||
}
|
||||
@Override
|
||||
public int getPriorityLevel() {
|
||||
return 0; // doesn't matter.
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private Server setupDecayRpcSchedulerandTestServer(String ns)
|
||||
throws Exception {
|
||||
final int queueSizePerHandler = 3;
|
||||
|
|
|
@ -62,6 +62,8 @@ public class TestRpcBase {
|
|||
|
||||
protected final static String SERVER_PRINCIPAL_KEY =
|
||||
"test.ipc.server.principal";
|
||||
protected final static String CLIENT_PRINCIPAL_KEY =
|
||||
"test.ipc.client.principal";
|
||||
protected final static String ADDRESS = "0.0.0.0";
|
||||
protected final static int PORT = 0;
|
||||
protected static InetSocketAddress addr;
|
||||
|
@ -271,7 +273,8 @@ public class TestRpcBase {
|
|||
}
|
||||
}
|
||||
|
||||
@KerberosInfo(serverPrincipal = SERVER_PRINCIPAL_KEY)
|
||||
@KerberosInfo(serverPrincipal = SERVER_PRINCIPAL_KEY,
|
||||
clientPrincipal = CLIENT_PRINCIPAL_KEY)
|
||||
@TokenInfo(TestTokenSelector.class)
|
||||
@ProtocolInfo(protocolName = "org.apache.hadoop.ipc.TestRpcBase$TestRpcService",
|
||||
protocolVersion = 1)
|
||||
|
|
Loading…
Reference in New Issue