merge -c 1510772 FIXES: HADOOP-9816. RPC Sasl QOP is broken (daryn)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1510774 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
3cadf85d86
commit
9d28a52b30
|
@ -419,6 +419,8 @@ Release 2.1.0-beta - 2013-08-06
|
||||||
HADOOP-9507. LocalFileSystem rename() is broken in some cases when
|
HADOOP-9507. LocalFileSystem rename() is broken in some cases when
|
||||||
destination exists. (cnauroth)
|
destination exists. (cnauroth)
|
||||||
|
|
||||||
|
HADOOP-9816. RPC Sasl QOP is broken (daryn)
|
||||||
|
|
||||||
BREAKDOWN OF HADOOP-8562 SUBTASKS AND RELATED JIRAS
|
BREAKDOWN OF HADOOP-8562 SUBTASKS AND RELATED JIRAS
|
||||||
|
|
||||||
HADOOP-8924. Hadoop Common creating package-info.java must not depend on
|
HADOOP-8924. Hadoop Common creating package-info.java must not depend on
|
||||||
|
|
|
@ -50,6 +50,7 @@ import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
import javax.net.SocketFactory;
|
import javax.net.SocketFactory;
|
||||||
|
import javax.security.sasl.Sasl;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
@ -85,6 +86,7 @@ import org.apache.hadoop.util.ReflectionUtils;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
import org.apache.hadoop.util.Time;
|
import org.apache.hadoop.util.Time;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
import com.google.protobuf.CodedOutputStream;
|
import com.google.protobuf.CodedOutputStream;
|
||||||
|
@ -709,6 +711,9 @@ public class Client {
|
||||||
// Sasl connect is successful. Let's set up Sasl i/o streams.
|
// Sasl connect is successful. Let's set up Sasl i/o streams.
|
||||||
inStream = saslRpcClient.getInputStream(inStream);
|
inStream = saslRpcClient.getInputStream(inStream);
|
||||||
outStream = saslRpcClient.getOutputStream(outStream);
|
outStream = saslRpcClient.getOutputStream(outStream);
|
||||||
|
// for testing
|
||||||
|
remoteId.saslQop =
|
||||||
|
(String)saslRpcClient.getNegotiatedProperty(Sasl.QOP);
|
||||||
} else if (UserGroupInformation.isSecurityEnabled() &&
|
} else if (UserGroupInformation.isSecurityEnabled() &&
|
||||||
!fallbackAllowed) {
|
!fallbackAllowed) {
|
||||||
throw new IOException("Server asks us to fall back to SIMPLE " +
|
throw new IOException("Server asks us to fall back to SIMPLE " +
|
||||||
|
@ -1453,6 +1458,7 @@ public class Client {
|
||||||
private final boolean tcpNoDelay; // if T then disable Nagle's Algorithm
|
private final boolean tcpNoDelay; // if T then disable Nagle's Algorithm
|
||||||
private final boolean doPing; //do we need to send ping message
|
private final boolean doPing; //do we need to send ping message
|
||||||
private final int pingInterval; // how often sends ping to the server in msecs
|
private final int pingInterval; // how often sends ping to the server in msecs
|
||||||
|
private String saslQop; // here for testing
|
||||||
|
|
||||||
ConnectionId(InetSocketAddress address, Class<?> protocol,
|
ConnectionId(InetSocketAddress address, Class<?> protocol,
|
||||||
UserGroupInformation ticket, int rpcTimeout, int maxIdleTime,
|
UserGroupInformation ticket, int rpcTimeout, int maxIdleTime,
|
||||||
|
@ -1507,6 +1513,11 @@ public class Client {
|
||||||
return pingInterval;
|
return pingInterval;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
String getSaslQop() {
|
||||||
|
return saslQop;
|
||||||
|
}
|
||||||
|
|
||||||
static ConnectionId getConnectionId(InetSocketAddress addr,
|
static ConnectionId getConnectionId(InetSocketAddress addr,
|
||||||
Class<?> protocol, UserGroupInformation ticket, int rpcTimeout,
|
Class<?> protocol, UserGroupInformation ticket, int rpcTimeout,
|
||||||
Configuration conf) throws IOException {
|
Configuration conf) throws IOException {
|
||||||
|
|
|
@ -1274,8 +1274,8 @@ public abstract class Server {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private RpcSaslProto saslReadAndProcess(DataInputStream dis) throws
|
private void saslReadAndProcess(DataInputStream dis) throws
|
||||||
WrappedRpcServerException, InterruptedException {
|
WrappedRpcServerException, IOException, InterruptedException {
|
||||||
if (saslContextEstablished) {
|
if (saslContextEstablished) {
|
||||||
throw new WrappedRpcServerException(
|
throw new WrappedRpcServerException(
|
||||||
RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,
|
RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,
|
||||||
|
@ -1308,8 +1308,6 @@ public abstract class Server {
|
||||||
LOG.debug("SASL server context established. Negotiated QoP is "
|
LOG.debug("SASL server context established. Negotiated QoP is "
|
||||||
+ saslServer.getNegotiatedProperty(Sasl.QOP));
|
+ saslServer.getNegotiatedProperty(Sasl.QOP));
|
||||||
}
|
}
|
||||||
String qop = (String) saslServer.getNegotiatedProperty(Sasl.QOP);
|
|
||||||
useWrap = qop != null && !"auth".equalsIgnoreCase(qop);
|
|
||||||
user = getAuthorizedUgi(saslServer.getAuthorizationID());
|
user = getAuthorizedUgi(saslServer.getAuthorizationID());
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("SASL server successfully authenticated client: " + user);
|
LOG.debug("SASL server successfully authenticated client: " + user);
|
||||||
|
@ -1324,7 +1322,15 @@ public abstract class Server {
|
||||||
throw new WrappedRpcServerException(
|
throw new WrappedRpcServerException(
|
||||||
RpcErrorCodeProto.FATAL_UNAUTHORIZED, ioe);
|
RpcErrorCodeProto.FATAL_UNAUTHORIZED, ioe);
|
||||||
}
|
}
|
||||||
return saslResponse;
|
// send back response if any, may throw IOException
|
||||||
|
if (saslResponse != null) {
|
||||||
|
doSaslReply(saslResponse);
|
||||||
|
}
|
||||||
|
// do NOT enable wrapping until the last auth response is sent
|
||||||
|
if (saslContextEstablished) {
|
||||||
|
String qop = (String) saslServer.getNegotiatedProperty(Sasl.QOP);
|
||||||
|
useWrap = (qop != null && !"auth".equalsIgnoreCase(qop));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private RpcSaslProto processSaslMessage(DataInputStream dis)
|
private RpcSaslProto processSaslMessage(DataInputStream dis)
|
||||||
|
@ -1904,11 +1910,7 @@ public abstract class Server {
|
||||||
RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,
|
RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,
|
||||||
"SASL protocol not requested by client");
|
"SASL protocol not requested by client");
|
||||||
}
|
}
|
||||||
RpcSaslProto response = saslReadAndProcess(dis);
|
saslReadAndProcess(dis);
|
||||||
// send back response if any, may throw IOException
|
|
||||||
if (response != null) {
|
|
||||||
doSaslReply(response);
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
throw new WrappedRpcServerException(
|
throw new WrappedRpcServerException(
|
||||||
RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,
|
RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,
|
||||||
|
|
|
@ -67,6 +67,7 @@ import org.apache.hadoop.security.token.TokenInfo;
|
||||||
import org.apache.hadoop.security.token.TokenSelector;
|
import org.apache.hadoop.security.token.TokenSelector;
|
||||||
import org.apache.hadoop.util.ProtoUtil;
|
import org.apache.hadoop.util.ProtoUtil;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.protobuf.ByteString;
|
import com.google.protobuf.ByteString;
|
||||||
/**
|
/**
|
||||||
* A utility class that encapsulates SASL logic for RPC client
|
* A utility class that encapsulates SASL logic for RPC client
|
||||||
|
@ -106,6 +107,12 @@ public class SaslRpcClient {
|
||||||
this.conf = conf;
|
this.conf = conf;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public Object getNegotiatedProperty(String key) {
|
||||||
|
return (saslClient != null) ? saslClient.getNegotiatedProperty(key) : null;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Instantiate a sasl client for the first supported auth type in the
|
* Instantiate a sasl client for the first supported auth type in the
|
||||||
* given list. The auth type must be defined, enabled, and the user
|
* given list. The auth type must be defined, enabled, and the user
|
||||||
|
|
|
@ -29,6 +29,7 @@ import java.lang.annotation.Annotation;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.security.PrivilegedExceptionAction;
|
import java.security.PrivilegedExceptionAction;
|
||||||
import java.security.Security;
|
import java.security.Security;
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.regex.Pattern;
|
import java.util.regex.Pattern;
|
||||||
|
@ -44,8 +45,6 @@ import javax.security.sasl.SaslClient;
|
||||||
import javax.security.sasl.SaslException;
|
import javax.security.sasl.SaslException;
|
||||||
import javax.security.sasl.SaslServer;
|
import javax.security.sasl.SaslServer;
|
||||||
|
|
||||||
import junit.framework.Assert;
|
|
||||||
|
|
||||||
import org.apache.commons.lang.StringUtils;
|
import org.apache.commons.lang.StringUtils;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
@ -62,11 +61,11 @@ import org.apache.hadoop.security.SaslPlainServer;
|
||||||
import org.apache.hadoop.security.SaslRpcClient;
|
import org.apache.hadoop.security.SaslRpcClient;
|
||||||
import org.apache.hadoop.security.SaslRpcServer;
|
import org.apache.hadoop.security.SaslRpcServer;
|
||||||
import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
|
import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
|
||||||
|
import org.apache.hadoop.security.SaslRpcServer.QualityOfProtection;
|
||||||
import org.apache.hadoop.security.SecurityInfo;
|
import org.apache.hadoop.security.SecurityInfo;
|
||||||
import org.apache.hadoop.security.SecurityUtil;
|
import org.apache.hadoop.security.SecurityUtil;
|
||||||
import org.apache.hadoop.security.TestUserGroupInformation;
|
import org.apache.hadoop.security.TestUserGroupInformation;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
|
|
||||||
import org.apache.hadoop.security.token.SecretManager;
|
import org.apache.hadoop.security.token.SecretManager;
|
||||||
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
|
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
|
@ -77,9 +76,28 @@ import org.apache.log4j.Level;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
|
import org.junit.runners.Parameterized;
|
||||||
|
import org.junit.runners.Parameterized.Parameters;
|
||||||
|
|
||||||
/** Unit tests for using Sasl over RPC. */
|
/** Unit tests for using Sasl over RPC. */
|
||||||
|
@RunWith(Parameterized.class)
|
||||||
public class TestSaslRPC {
|
public class TestSaslRPC {
|
||||||
|
@Parameters
|
||||||
|
public static Collection<Object[]> data() {
|
||||||
|
Collection<Object[]> params = new ArrayList<Object[]>();
|
||||||
|
for (QualityOfProtection qop : QualityOfProtection.values()) {
|
||||||
|
params.add(new Object[]{ qop });
|
||||||
|
}
|
||||||
|
return params;
|
||||||
|
}
|
||||||
|
|
||||||
|
QualityOfProtection expectedQop;
|
||||||
|
|
||||||
|
public TestSaslRPC(QualityOfProtection qop) {
|
||||||
|
expectedQop = qop;
|
||||||
|
}
|
||||||
|
|
||||||
private static final String ADDRESS = "0.0.0.0";
|
private static final String ADDRESS = "0.0.0.0";
|
||||||
|
|
||||||
public static final Log LOG =
|
public static final Log LOG =
|
||||||
|
@ -115,8 +133,12 @@ public class TestSaslRPC {
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setup() {
|
public void setup() {
|
||||||
|
LOG.info("---------------------------------");
|
||||||
|
LOG.info("Testing QOP:"+expectedQop);
|
||||||
|
LOG.info("---------------------------------");
|
||||||
conf = new Configuration();
|
conf = new Configuration();
|
||||||
conf.set(HADOOP_SECURITY_AUTHENTICATION, KERBEROS.toString());
|
conf.set(HADOOP_SECURITY_AUTHENTICATION, KERBEROS.toString());
|
||||||
|
conf.set("hadoop.rpc.protection", expectedQop.name().toLowerCase());
|
||||||
UserGroupInformation.setConfiguration(conf);
|
UserGroupInformation.setConfiguration(conf);
|
||||||
enableSecretManager = null;
|
enableSecretManager = null;
|
||||||
forceSecretManager = null;
|
forceSecretManager = null;
|
||||||
|
@ -226,15 +248,16 @@ public class TestSaslRPC {
|
||||||
serverPrincipal = SERVER_PRINCIPAL_KEY)
|
serverPrincipal = SERVER_PRINCIPAL_KEY)
|
||||||
@TokenInfo(TestTokenSelector.class)
|
@TokenInfo(TestTokenSelector.class)
|
||||||
public interface TestSaslProtocol extends TestRPC.TestProtocol {
|
public interface TestSaslProtocol extends TestRPC.TestProtocol {
|
||||||
public AuthenticationMethod getAuthMethod() throws IOException;
|
public AuthMethod getAuthMethod() throws IOException;
|
||||||
public String getAuthUser() throws IOException;
|
public String getAuthUser() throws IOException;
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class TestSaslImpl extends TestRPC.TestImpl implements
|
public static class TestSaslImpl extends TestRPC.TestImpl implements
|
||||||
TestSaslProtocol {
|
TestSaslProtocol {
|
||||||
@Override
|
@Override
|
||||||
public AuthenticationMethod getAuthMethod() throws IOException {
|
public AuthMethod getAuthMethod() throws IOException {
|
||||||
return UserGroupInformation.getCurrentUser().getAuthenticationMethod();
|
return UserGroupInformation.getCurrentUser()
|
||||||
|
.getAuthenticationMethod().getAuthMethod();
|
||||||
}
|
}
|
||||||
@Override
|
@Override
|
||||||
public String getAuthUser() throws IOException {
|
public String getAuthUser() throws IOException {
|
||||||
|
@ -341,8 +364,11 @@ public class TestSaslRPC {
|
||||||
try {
|
try {
|
||||||
proxy = RPC.getProxy(TestSaslProtocol.class,
|
proxy = RPC.getProxy(TestSaslProtocol.class,
|
||||||
TestSaslProtocol.versionID, addr, conf);
|
TestSaslProtocol.versionID, addr, conf);
|
||||||
|
AuthMethod authMethod = proxy.getAuthMethod();
|
||||||
|
assertEquals(TOKEN, authMethod);
|
||||||
//QOP must be auth
|
//QOP must be auth
|
||||||
Assert.assertEquals(SaslRpcServer.SASL_PROPS.get(Sasl.QOP), "auth");
|
assertEquals(expectedQop.saslQop,
|
||||||
|
RPC.getConnectionIdForProxy(proxy).getSaslQop());
|
||||||
proxy.ping();
|
proxy.ping();
|
||||||
} finally {
|
} finally {
|
||||||
server.stop();
|
server.stop();
|
||||||
|
@ -393,6 +419,7 @@ public class TestSaslRPC {
|
||||||
newConf.set(CommonConfigurationKeysPublic.
|
newConf.set(CommonConfigurationKeysPublic.
|
||||||
HADOOP_RPC_SOCKET_FACTORY_CLASS_DEFAULT_KEY, "");
|
HADOOP_RPC_SOCKET_FACTORY_CLASS_DEFAULT_KEY, "");
|
||||||
|
|
||||||
|
Client client = null;
|
||||||
TestSaslProtocol proxy1 = null;
|
TestSaslProtocol proxy1 = null;
|
||||||
TestSaslProtocol proxy2 = null;
|
TestSaslProtocol proxy2 = null;
|
||||||
TestSaslProtocol proxy3 = null;
|
TestSaslProtocol proxy3 = null;
|
||||||
|
@ -402,7 +429,7 @@ public class TestSaslRPC {
|
||||||
proxy1 = RPC.getProxy(TestSaslProtocol.class,
|
proxy1 = RPC.getProxy(TestSaslProtocol.class,
|
||||||
TestSaslProtocol.versionID, addr, newConf);
|
TestSaslProtocol.versionID, addr, newConf);
|
||||||
proxy1.getAuthMethod();
|
proxy1.getAuthMethod();
|
||||||
Client client = WritableRpcEngine.getClient(conf);
|
client = WritableRpcEngine.getClient(newConf);
|
||||||
Set<ConnectionId> conns = client.getConnectionIds();
|
Set<ConnectionId> conns = client.getConnectionIds();
|
||||||
assertEquals("number of connections in cache is wrong", 1, conns.size());
|
assertEquals("number of connections in cache is wrong", 1, conns.size());
|
||||||
// same conf, connection should be re-used
|
// same conf, connection should be re-used
|
||||||
|
@ -428,9 +455,13 @@ public class TestSaslRPC {
|
||||||
assertNotSame(connsArray[2].getMaxIdleTime(), timeouts[1]);
|
assertNotSame(connsArray[2].getMaxIdleTime(), timeouts[1]);
|
||||||
} finally {
|
} finally {
|
||||||
server.stop();
|
server.stop();
|
||||||
RPC.stopProxy(proxy1);
|
// this is dirty, but clear out connection cache for next run
|
||||||
RPC.stopProxy(proxy2);
|
if (client != null) {
|
||||||
RPC.stopProxy(proxy3);
|
client.getConnectionIds().clear();
|
||||||
|
}
|
||||||
|
if (proxy1 != null) RPC.stopProxy(proxy1);
|
||||||
|
if (proxy2 != null) RPC.stopProxy(proxy2);
|
||||||
|
if (proxy3 != null) RPC.stopProxy(proxy3);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -873,14 +904,13 @@ public class TestSaslRPC {
|
||||||
TestSaslProtocol.versionID, addr, clientConf);
|
TestSaslProtocol.versionID, addr, clientConf);
|
||||||
|
|
||||||
proxy.ping();
|
proxy.ping();
|
||||||
// verify sasl completed
|
|
||||||
if (serverAuth != SIMPLE) {
|
|
||||||
assertEquals(SaslRpcServer.SASL_PROPS.get(Sasl.QOP), "auth");
|
|
||||||
}
|
|
||||||
|
|
||||||
// make sure the other side thinks we are who we said we are!!!
|
// make sure the other side thinks we are who we said we are!!!
|
||||||
assertEquals(clientUgi.getUserName(), proxy.getAuthUser());
|
assertEquals(clientUgi.getUserName(), proxy.getAuthUser());
|
||||||
return proxy.getAuthMethod().toString();
|
AuthMethod authMethod = proxy.getAuthMethod();
|
||||||
|
// verify sasl completed with correct QOP
|
||||||
|
assertEquals((authMethod != SIMPLE) ? expectedQop.saslQop : null,
|
||||||
|
RPC.getConnectionIdForProxy(proxy).getSaslQop());
|
||||||
|
return authMethod.toString();
|
||||||
} finally {
|
} finally {
|
||||||
if (proxy != null) {
|
if (proxy != null) {
|
||||||
RPC.stopProxy(proxy);
|
RPC.stopProxy(proxy);
|
||||||
|
|
Loading…
Reference in New Issue