HADOOP-17346. Fair call queue is defeated by abusive service principals. Contributed by Ahmed Hussein (ahussein).

This commit is contained in:
Eric Payne 2020-11-23 20:37:33 +00:00
parent 7ca539bc1b
commit 8459f1d955
10 changed files with 167 additions and 29 deletions

View File

@ -33,6 +33,7 @@ import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto; import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -208,6 +209,19 @@ public class CallQueueManager<E extends Schedulable>
return scheduler.getPriorityLevel(e); return scheduler.getPriorityLevel(e);
} }
int getPriorityLevel(UserGroupInformation user) {
if (scheduler instanceof DecayRpcScheduler) {
return ((DecayRpcScheduler)scheduler).getPriorityLevel(user);
}
return 0;
}
void setPriorityLevel(UserGroupInformation user, int priority) {
if (scheduler instanceof DecayRpcScheduler) {
((DecayRpcScheduler)scheduler).setPriorityLevel(user, priority);
}
}
void setClientBackoffEnabled(boolean value) { void setClientBackoffEnabled(boolean value) {
clientBackOffEnabled = value; clientBackOffEnabled = value;
} }

View File

@ -25,6 +25,7 @@ import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.Timer; import java.util.Timer;
import java.util.TimerTask; import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
@ -39,6 +40,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter; import com.fasterxml.jackson.databind.ObjectWriter;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.AtomicDoubleArray; import com.google.common.util.concurrent.AtomicDoubleArray;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.CommonConfigurationKeys;
@ -179,6 +181,7 @@ public class DecayRpcScheduler implements RpcScheduler,
private MetricsProxy metricsProxy; private MetricsProxy metricsProxy;
private final CostProvider costProvider; private final CostProvider costProvider;
private final Map<String, Integer> staticPriorities = new HashMap<>();
/** /**
* This TimerTask will call decayCurrentCosts until * This TimerTask will call decayCurrentCosts until
* the scheduler has been garbage collected. * the scheduler has been garbage collected.
@ -486,7 +489,7 @@ public class DecayRpcScheduler implements RpcScheduler,
AtomicLong value = entry.getValue().get(0); AtomicLong value = entry.getValue().get(0);
long snapshot = value.get(); long snapshot = value.get();
int computedLevel = computePriorityLevel(snapshot); int computedLevel = computePriorityLevel(snapshot, id);
nextCache.put(id, computedLevel); nextCache.put(id, computedLevel);
} }
@ -536,7 +539,11 @@ public class DecayRpcScheduler implements RpcScheduler,
* @param cost the cost for an identity * @param cost the cost for an identity
* @return scheduling decision from 0 to numLevels - 1 * @return scheduling decision from 0 to numLevels - 1
*/ */
private int computePriorityLevel(long cost) { private int computePriorityLevel(long cost, Object identity) {
Integer staticPriority = staticPriorities.get(identity);
if (staticPriority != null) {
return staticPriority.intValue();
}
long totalCallSnapshot = totalDecayedCallCost.get(); long totalCallSnapshot = totalDecayedCallCost.get();
double proportion = 0; double proportion = 0;
@ -576,11 +583,20 @@ public class DecayRpcScheduler implements RpcScheduler,
// Cache was no good, compute it // Cache was no good, compute it
List<AtomicLong> costList = callCosts.get(identity); List<AtomicLong> costList = callCosts.get(identity);
long currentCost = costList == null ? 0 : costList.get(0).get(); long currentCost = costList == null ? 0 : costList.get(0).get();
int priority = computePriorityLevel(currentCost); int priority = computePriorityLevel(currentCost, identity);
LOG.debug("compute priority for {} priority {}", identity, priority); LOG.debug("compute priority for {} priority {}", identity, priority);
return priority; return priority;
} }
private String getIdentity(Schedulable obj) {
String identity = this.identityProvider.makeIdentity(obj);
if (identity == null) {
// Identity provider did not handle this
identity = DECAYSCHEDULER_UNKNOWN_IDENTITY;
}
return identity;
}
/** /**
* Compute the appropriate priority for a schedulable based on past requests. * Compute the appropriate priority for a schedulable based on past requests.
* @param obj the schedulable obj to query and remember * @param obj the schedulable obj to query and remember
@ -589,15 +605,42 @@ public class DecayRpcScheduler implements RpcScheduler,
@Override @Override
public int getPriorityLevel(Schedulable obj) { public int getPriorityLevel(Schedulable obj) {
// First get the identity // First get the identity
String identity = this.identityProvider.makeIdentity(obj); String identity = getIdentity(obj);
if (identity == null) { // highest priority users may have a negative priority but their
// Identity provider did not handle this // calls will be priority 0.
identity = DECAYSCHEDULER_UNKNOWN_IDENTITY; return Math.max(0, cachedOrComputedPriorityLevel(identity));
} }
@VisibleForTesting
int getPriorityLevel(UserGroupInformation ugi) {
String identity = getIdentity(newSchedulable(ugi));
// returns true priority of the user.
return cachedOrComputedPriorityLevel(identity); return cachedOrComputedPriorityLevel(identity);
} }
@VisibleForTesting
void setPriorityLevel(UserGroupInformation ugi, int priority) {
String identity = getIdentity(newSchedulable(ugi));
priority = Math.min(numLevels - 1, priority);
LOG.info("Setting priority for user:" + identity + "=" + priority);
staticPriorities.put(identity, priority);
}
// dummy instance to conform to identity provider api.
private static Schedulable newSchedulable(UserGroupInformation ugi) {
return new Schedulable() {
@Override
public UserGroupInformation getUserGroupInformation() {
return ugi;
}
@Override
public int getPriorityLevel() {
return 0;
}
};
}
@Override @Override
public boolean shouldBackOff(Schedulable obj) { public boolean shouldBackOff(Schedulable obj) {
Boolean backOff = false; Boolean backOff = false;

View File

@ -51,6 +51,7 @@ import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.Rpc
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto; import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SaslRpcServer; import org.apache.hadoop.security.SaslRpcServer;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager; import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.security.token.TokenIdentifier;
@ -980,7 +981,18 @@ public class RPC {
" ProtocolImpl=" + protocolImpl.getClass().getName() + " ProtocolImpl=" + protocolImpl.getClass().getName() +
" protocolClass=" + protocolClass.getName()); " protocolClass=" + protocolClass.getName());
} }
} String client = SecurityUtil.getClientPrincipal(protocolClass, getConf());
if (client != null) {
// notify the server's rpc scheduler that the protocol user has
// highest priority. the scheduler should exempt the user from
// priority calculations.
try {
setPriorityLevel(UserGroupInformation.createRemoteUser(client), -1);
} catch (Exception ex) {
LOG.warn("Failed to set scheduling priority for " + client, ex);
}
}
}
static class VerProtocolImpl { static class VerProtocolImpl {
final long version; final long version;

View File

@ -644,6 +644,21 @@ public abstract class Server {
} }
} }
@VisibleForTesting
int getPriorityLevel(Schedulable e) {
return callQueue.getPriorityLevel(e);
}
@VisibleForTesting
int getPriorityLevel(UserGroupInformation ugi) {
return callQueue.getPriorityLevel(ugi);
}
@VisibleForTesting
void setPriorityLevel(UserGroupInformation ugi, int priority) {
callQueue.setPriorityLevel(ugi, priority);
}
/** /**
* Returns a handle to the rpcMetrics (required in tests) * Returns a handle to the rpcMetrics (required in tests)
* @return rpc metrics * @return rpc metrics

View File

@ -31,6 +31,6 @@ public class UserIdentityProvider implements IdentityProvider {
return null; return null;
} }
return ugi.getUserName(); return ugi.getShortUserName();
} }
} }

View File

@ -381,6 +381,24 @@ public final class SecurityUtil {
return null; return null;
} }
/**
* Look up the client principal for a given protocol. It searches all known
* SecurityInfo providers.
* @param protocol the protocol class to get the information for
* @param conf configuration object
* @return client principal or null if it has no client principal defined.
*/
public static String getClientPrincipal(Class<?> protocol,
Configuration conf) {
String user = null;
KerberosInfo krbInfo = SecurityUtil.getKerberosInfo(protocol, conf);
if (krbInfo != null) {
String key = krbInfo.clientPrincipal();
user = (key != null && !key.isEmpty()) ? conf.get(key) : null;
}
return user;
}
/** /**
* Look up the TokenInfo for a given protocol. It searches all known * Look up the TokenInfo for a given protocol. It searches all known
* SecurityInfo providers. * SecurityInfo providers.

View File

@ -28,7 +28,6 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.security.KerberosInfo;
import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.MachineList; import org.apache.hadoop.util.MachineList;
@ -101,21 +100,19 @@ public class ServiceAuthorizationManager {
String clientPrincipal = null; String clientPrincipal = null;
if (UserGroupInformation.isSecurityEnabled()) { if (UserGroupInformation.isSecurityEnabled()) {
// get client principal key to verify (if available) // get client principal key to verify (if available)
KerberosInfo krbInfo = SecurityUtil.getKerberosInfo(protocol, conf); clientPrincipal = SecurityUtil.getClientPrincipal(protocol, conf);
if (krbInfo != null) { try {
String clientKey = krbInfo.clientPrincipal(); if (clientPrincipal != null) {
if (clientKey != null && !clientKey.isEmpty()) { clientPrincipal =
try { SecurityUtil.getServerPrincipal(clientPrincipal, addr);
clientPrincipal = SecurityUtil.getServerPrincipal(
conf.get(clientKey), addr);
} catch (IOException e) {
throw (AuthorizationException) new AuthorizationException(
"Can't figure out Kerberos principal name for connection from "
+ addr + " for user=" + user + " protocol=" + protocol)
.initCause(e);
}
} }
} catch (IOException e) {
throw (AuthorizationException) new AuthorizationException(
"Can't figure out Kerberos principal name for connection from "
+ addr + " for user=" + user + " protocol=" + protocol)
.initCause(e);
} }
} }
if((clientPrincipal != null && !clientPrincipal.equals(user.getUserName())) || if((clientPrincipal != null && !clientPrincipal.equals(user.getUserName())) ||
acls.length != 2 || !acls[0].isUserAllowed(user) || acls[1].isUserAllowed(user)) { acls.length != 2 || !acls[0].isUserAllowed(user) || acls[1].isUserAllowed(user)) {

View File

@ -42,9 +42,8 @@ import java.util.concurrent.TimeUnit;
public class TestDecayRpcScheduler { public class TestDecayRpcScheduler {
private Schedulable mockCall(String id) { private Schedulable mockCall(String id) {
Schedulable mockCall = mock(Schedulable.class); Schedulable mockCall = mock(Schedulable.class);
UserGroupInformation ugi = mock(UserGroupInformation.class); UserGroupInformation ugi = UserGroupInformation.createRemoteUser(id);
when(ugi.getUserName()).thenReturn(id);
when(mockCall.getUserGroupInformation()).thenReturn(ugi); when(mockCall.getUserGroupInformation()).thenReturn(ugi);
return mockCall; return mockCall;

View File

@ -1294,6 +1294,43 @@ public class TestRPC extends TestRpcBase {
} }
} }
@Test (timeout=30000)
public void testProtocolUserPriority() throws Exception {
final String ns = CommonConfigurationKeys.IPC_NAMESPACE + ".0";
conf.set(CLIENT_PRINCIPAL_KEY, "clientForProtocol");
Server server = null;
try {
server = setupDecayRpcSchedulerandTestServer(ns + ".");
UserGroupInformation ugi = UserGroupInformation.createRemoteUser("user");
// normal users start with priority 0.
Assert.assertEquals(0, server.getPriorityLevel(ugi));
// calls for a protocol defined client will have priority of 0.
Assert.assertEquals(0, server.getPriorityLevel(newSchedulable(ugi)));
// protocol defined client will have top priority of -1.
ugi = UserGroupInformation.createRemoteUser("clientForProtocol");
Assert.assertEquals(-1, server.getPriorityLevel(ugi));
// calls for a protocol defined client will have priority of 0.
Assert.assertEquals(0, server.getPriorityLevel(newSchedulable(ugi)));
} finally {
stop(server, null);
}
}
private static Schedulable newSchedulable(UserGroupInformation ugi) {
return new Schedulable(){
@Override
public UserGroupInformation getUserGroupInformation() {
return ugi;
}
@Override
public int getPriorityLevel() {
return 0; // doesn't matter.
}
};
}
private Server setupDecayRpcSchedulerandTestServer(String ns) private Server setupDecayRpcSchedulerandTestServer(String ns)
throws Exception { throws Exception {
final int queueSizePerHandler = 3; final int queueSizePerHandler = 3;

View File

@ -62,6 +62,8 @@ public class TestRpcBase {
protected final static String SERVER_PRINCIPAL_KEY = protected final static String SERVER_PRINCIPAL_KEY =
"test.ipc.server.principal"; "test.ipc.server.principal";
protected final static String CLIENT_PRINCIPAL_KEY =
"test.ipc.client.principal";
protected final static String ADDRESS = "0.0.0.0"; protected final static String ADDRESS = "0.0.0.0";
protected final static int PORT = 0; protected final static int PORT = 0;
protected static InetSocketAddress addr; protected static InetSocketAddress addr;
@ -271,7 +273,8 @@ public class TestRpcBase {
} }
} }
@KerberosInfo(serverPrincipal = SERVER_PRINCIPAL_KEY) @KerberosInfo(serverPrincipal = SERVER_PRINCIPAL_KEY,
clientPrincipal = CLIENT_PRINCIPAL_KEY)
@TokenInfo(TestTokenSelector.class) @TokenInfo(TestTokenSelector.class)
@ProtocolInfo(protocolName = "org.apache.hadoop.ipc.TestRpcBase$TestRpcService", @ProtocolInfo(protocolName = "org.apache.hadoop.ipc.TestRpcBase$TestRpcService",
protocolVersion = 1) protocolVersion = 1)