From 478a25b92938cae5566b50818ff0f33de49934df Mon Sep 17 00:00:00 2001 From: Haohui Mai Date: Sun, 20 Mar 2016 17:40:59 -0700 Subject: [PATCH] HADOOP-12819. Migrate TestSaslRPC and related codes to rebase on ProtobufRpcEngine. Contributed by Kai Zheng. --- .../org/apache/hadoop/ipc/TestRpcBase.java | 192 ++++++++- .../org/apache/hadoop/ipc/TestSaslRPC.java | 395 +++++------------- .../security/TestDoAsEffectiveUser.java | 6 +- .../hadoop-common/src/test/proto/test.proto | 9 + .../src/test/proto/test_rpc_service.proto | 4 + ...ache.hadoop.security.token.TokenIdentifier | 2 +- 6 files changed, 305 insertions(+), 303 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcBase.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcBase.java index 03fd31ed668..bc604a47ef2 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcBase.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcBase.java @@ -29,6 +29,22 @@ import org.apache.hadoop.security.token.SecretManager; import org.junit.Assert; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.ipc.protobuf.TestProtos; +import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.KerberosInfo; +import org.apache.hadoop.security.SaslRpcServer.AuthMethod; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.SecretManager; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.hadoop.security.token.TokenInfo; +import org.apache.hadoop.security.token.TokenSelector; +import org.junit.Assert; + +import java.io.DataInput; +import java.io.DataOutput; import java.io.IOException; import java.lang.management.ManagementFactory; import java.lang.management.ThreadInfo; @@ -37,6 +53,8 @@ import java.net.URISyntaxException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; @@ -149,6 +167,89 @@ protected static int countThreads(String search) { return count; } + public static class TestTokenIdentifier extends TokenIdentifier { + private Text tokenid; + private Text realUser; + final static Text KIND_NAME = new Text("test.token"); + + public TestTokenIdentifier() { + this(new Text(), new Text()); + } + public TestTokenIdentifier(Text tokenid) { + this(tokenid, new Text()); + } + public TestTokenIdentifier(Text tokenid, Text realUser) { + this.tokenid = tokenid == null ? new Text() : tokenid; + this.realUser = realUser == null ? new Text() : realUser; + } + @Override + public Text getKind() { + return KIND_NAME; + } + @Override + public UserGroupInformation getUser() { + if (realUser.toString().isEmpty()) { + return UserGroupInformation.createRemoteUser(tokenid.toString()); + } else { + UserGroupInformation realUgi = UserGroupInformation + .createRemoteUser(realUser.toString()); + return UserGroupInformation + .createProxyUser(tokenid.toString(), realUgi); + } + } + + @Override + public void readFields(DataInput in) throws IOException { + tokenid.readFields(in); + realUser.readFields(in); + } + @Override + public void write(DataOutput out) throws IOException { + tokenid.write(out); + realUser.write(out); + } + } + + public static class TestTokenSecretManager extends + SecretManager { + @Override + public byte[] createPassword(TestTokenIdentifier id) { + return id.getBytes(); + } + + @Override + public byte[] retrievePassword(TestTokenIdentifier id) + throws InvalidToken { + return id.getBytes(); + } + + @Override + public TestTokenIdentifier createIdentifier() { + return new TestTokenIdentifier(); + } + } + + public static class TestTokenSelector implements + TokenSelector { + @SuppressWarnings("unchecked") + @Override + public Token selectToken(Text service, + Collection> tokens) { + if (service == null) { + return null; + } + for (Token token : tokens) { + if (TestTokenIdentifier.KIND_NAME.equals(token.getKind()) + && service.equals(token.getService())) { + return (Token) token; + } + } + return null; + } + } + + @KerberosInfo(serverPrincipal = SERVER_PRINCIPAL_KEY) + @TokenInfo(TestTokenSelector.class) @ProtocolInfo(protocolName = "org.apache.hadoop.ipc.TestRpcBase$TestRpcService", protocolVersion = 1) public interface TestRpcService @@ -267,12 +368,80 @@ public TestProtos.EmptyResponseProto sleep( } catch (InterruptedException ignore) {} return TestProtos.EmptyResponseProto.newBuilder().build(); } + + @Override + public TestProtos.AuthMethodResponseProto getAuthMethod( + RpcController controller, TestProtos.EmptyRequestProto request) + throws ServiceException { + AuthMethod authMethod = null; + try { + authMethod = UserGroupInformation.getCurrentUser() + .getAuthenticationMethod().getAuthMethod(); + } catch (IOException e) { + throw new ServiceException(e); + } + + return TestProtos.AuthMethodResponseProto.newBuilder() + .setCode(authMethod.code) + .setMechanismName(authMethod.getMechanismName()) + .build(); + } + + @Override + public TestProtos.AuthUserResponseProto getAuthUser( + RpcController controller, TestProtos.EmptyRequestProto request) + throws ServiceException { + UserGroupInformation authUser = null; + try { + authUser = UserGroupInformation.getCurrentUser(); + } catch (IOException e) { + throw new ServiceException(e); + } + + return TestProtos.AuthUserResponseProto.newBuilder() + .setAuthUser(authUser.getUserName()) + .build(); + } + + @Override + public TestProtos.EchoResponseProto echoPostponed( + RpcController controller, TestProtos.EchoRequestProto request) + throws ServiceException { + Server.Call call = Server.getCurCall().get(); + call.postponeResponse(); + postponedCalls.add(call); + + return TestProtos.EchoResponseProto.newBuilder().setMessage( + request.getMessage()) + .build(); + } + + @Override + public TestProtos.EmptyResponseProto sendPostponed( + RpcController controller, TestProtos.EmptyRequestProto request) + throws ServiceException { + Collections.shuffle(postponedCalls); + try { + for (Server.Call call : postponedCalls) { + call.sendResponse(); + } + } catch (IOException e) { + throw new ServiceException(e); + } + postponedCalls.clear(); + + return TestProtos.EmptyResponseProto.newBuilder().build(); + } } protected static TestProtos.EmptyRequestProto newEmptyRequest() { return TestProtos.EmptyRequestProto.newBuilder().build(); } + protected static TestProtos.EmptyResponseProto newEmptyResponse() { + return TestProtos.EmptyResponseProto.newBuilder().build(); + } + protected static TestProtos.EchoRequestProto newEchoRequest(String msg) { return TestProtos.EchoRequestProto.newBuilder().setMessage(msg).build(); } @@ -290,6 +459,27 @@ protected static TestProtos.SlowPingRequestProto newSlowPingRequest( protected static TestProtos.SleepRequestProto newSleepRequest( int milliSeconds) { return TestProtos.SleepRequestProto.newBuilder() - .setMilliSeconds(milliSeconds).build(); + .setMilliSeconds(milliSeconds).build(); + } + + protected static TestProtos.EchoResponseProto newEchoResponse(String msg) { + return TestProtos.EchoResponseProto.newBuilder().setMessage(msg).build(); + } + + protected static AuthMethod convert( + TestProtos.AuthMethodResponseProto authMethodResponse) { + String mechanism = authMethodResponse.getMechanismName(); + if (mechanism.equals(AuthMethod.SIMPLE.getMechanismName())) { + return AuthMethod.SIMPLE; + } else if (mechanism.equals(AuthMethod.KERBEROS.getMechanismName())) { + return AuthMethod.KERBEROS; + } else if (mechanism.equals(AuthMethod.TOKEN.getMechanismName())) { + return AuthMethod.TOKEN; + } + return null; + } + + protected static String convert(TestProtos.AuthUserResponseProto response) { + return response.getAuthUser(); } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestSaslRPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestSaslRPC.java index 8df3b1ded6e..ec53e8c9762 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestSaslRPC.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestSaslRPC.java @@ -18,53 +18,7 @@ package org.apache.hadoop.ipc; -import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_RPC_PROTECTION; -import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION; -import static org.apache.hadoop.security.SaslRpcServer.AuthMethod.KERBEROS; -import static org.apache.hadoop.security.SaslRpcServer.AuthMethod.SIMPLE; -import static org.apache.hadoop.security.SaslRpcServer.AuthMethod.TOKEN; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNotSame; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.lang.annotation.Annotation; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.security.PrivilegedExceptionAction; -import java.security.Security; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.regex.Pattern; - -import javax.security.auth.callback.Callback; -import javax.security.auth.callback.CallbackHandler; -import javax.security.auth.callback.NameCallback; -import javax.security.auth.callback.PasswordCallback; -import javax.security.auth.callback.UnsupportedCallbackException; -import javax.security.sasl.AuthorizeCallback; -import javax.security.sasl.Sasl; -import javax.security.sasl.SaslClient; -import javax.security.sasl.SaslException; -import javax.security.sasl.SaslServer; - +import com.google.protobuf.ServiceException; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -74,27 +28,13 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.io.Text; import org.apache.hadoop.ipc.Client.ConnectionId; -import org.apache.hadoop.ipc.Server.Call; import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.security.KerberosInfo; -import org.apache.hadoop.security.SaslInputStream; -import org.apache.hadoop.security.SaslPlainServer; -import org.apache.hadoop.security.SaslPropertiesResolver; -import org.apache.hadoop.security.SaslRpcClient; -import org.apache.hadoop.security.SaslRpcServer; +import org.apache.hadoop.security.*; import org.apache.hadoop.security.SaslRpcServer.AuthMethod; import org.apache.hadoop.security.SaslRpcServer.QualityOfProtection; -import org.apache.hadoop.security.SecurityInfo; -import org.apache.hadoop.security.SecurityUtil; -import org.apache.hadoop.security.TestUserGroupInformation; -import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; -import org.apache.hadoop.security.token.SecretManager; +import org.apache.hadoop.security.token.*; import org.apache.hadoop.security.token.SecretManager.InvalidToken; -import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.security.token.TokenIdentifier; -import org.apache.hadoop.security.token.TokenInfo; -import org.apache.hadoop.security.token.TokenSelector; import org.apache.log4j.Level; import org.junit.Assert; import org.junit.Before; @@ -104,9 +44,27 @@ import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameters; +import javax.security.auth.callback.*; +import javax.security.sasl.*; +import java.io.IOException; +import java.lang.annotation.Annotation; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.security.PrivilegedExceptionAction; +import java.security.Security; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.regex.Pattern; + +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_RPC_PROTECTION; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION; +import static org.apache.hadoop.security.SaslRpcServer.AuthMethod.*; +import static org.junit.Assert.*; + /** Unit tests for using Sasl over RPC. */ @RunWith(Parameterized.class) -public class TestSaslRPC { +public class TestSaslRPC extends TestRpcBase { @Parameters public static Collection data() { Collection params = new ArrayList(); @@ -135,18 +93,14 @@ public TestSaslRPC(QualityOfProtection[] qop, this.expectedQop = expectedQop; this.saslPropertiesResolver = saslPropertiesResolver; } - - private static final String ADDRESS = "0.0.0.0"; public static final Log LOG = LogFactory.getLog(TestSaslRPC.class); static final String ERROR_MESSAGE = "Token is invalid"; - static final String SERVER_PRINCIPAL_KEY = "test.ipc.server.principal"; static final String SERVER_KEYTAB_KEY = "test.ipc.server.keytab"; static final String SERVER_PRINCIPAL_1 = "p1/foo@BAR"; - static final String SERVER_PRINCIPAL_2 = "p2/foo@BAR"; - private static Configuration conf; + // If this is set to true AND the auth-method is not simple, secretManager // will be enabled. static Boolean enableSecretManager = null; @@ -155,7 +109,7 @@ public TestSaslRPC(QualityOfProtection[] qop, static Boolean forceSecretManager = null; static Boolean clientFallBackToSimpleAllowed = true; - static enum UseToken { + enum UseToken { NONE(), VALID(), INVALID(), @@ -174,6 +128,7 @@ public void setup() { LOG.info("---------------------------------"); LOG.info("Testing QOP:"+ getQOPNames(qop)); LOG.info("---------------------------------"); + conf = new Configuration(); // the specific tests for kerberos will enable kerberos. forcing it // for all tests will cause tests to fail if the user has a TGT @@ -187,6 +142,9 @@ public void setup() { enableSecretManager = null; forceSecretManager = null; clientFallBackToSimpleAllowed = true; + + // Set RPC engine to protobuf RPC engine + RPC.setProtocolEngine(conf, TestRpcService.class, ProtobufRpcEngine.class); } static String getQOPNames (QualityOfProtection[] qops){ @@ -210,68 +168,6 @@ static String getQOPNames (QualityOfProtection[] qops){ ((Log4JLogger) SecurityUtil.LOG).getLogger().setLevel(Level.ALL); } - public static class TestTokenIdentifier extends TokenIdentifier { - private Text tokenid; - private Text realUser; - final static Text KIND_NAME = new Text("test.token"); - - public TestTokenIdentifier() { - this(new Text(), new Text()); - } - public TestTokenIdentifier(Text tokenid) { - this(tokenid, new Text()); - } - public TestTokenIdentifier(Text tokenid, Text realUser) { - this.tokenid = tokenid == null ? new Text() : tokenid; - this.realUser = realUser == null ? new Text() : realUser; - } - @Override - public Text getKind() { - return KIND_NAME; - } - @Override - public UserGroupInformation getUser() { - if (realUser.toString().isEmpty()) { - return UserGroupInformation.createRemoteUser(tokenid.toString()); - } else { - UserGroupInformation realUgi = UserGroupInformation - .createRemoteUser(realUser.toString()); - return UserGroupInformation - .createProxyUser(tokenid.toString(), realUgi); - } - } - - @Override - public void readFields(DataInput in) throws IOException { - tokenid.readFields(in); - realUser.readFields(in); - } - @Override - public void write(DataOutput out) throws IOException { - tokenid.write(out); - realUser.write(out); - } - } - - public static class TestTokenSecretManager extends - SecretManager { - @Override - public byte[] createPassword(TestTokenIdentifier id) { - return id.getBytes(); - } - - @Override - public byte[] retrievePassword(TestTokenIdentifier id) - throws InvalidToken { - return id.getBytes(); - } - - @Override - public TestTokenIdentifier createIdentifier() { - return new TestTokenIdentifier(); - } - } - public static class BadTokenSecretManager extends TestTokenSecretManager { @Override @@ -281,64 +177,6 @@ public byte[] retrievePassword(TestTokenIdentifier id) } } - public static class TestTokenSelector implements - TokenSelector { - @SuppressWarnings("unchecked") - @Override - public Token selectToken(Text service, - Collection> tokens) { - if (service == null) { - return null; - } - for (Token token : tokens) { - if (TestTokenIdentifier.KIND_NAME.equals(token.getKind()) - && service.equals(token.getService())) { - return (Token) token; - } - } - return null; - } - } - - @KerberosInfo( - serverPrincipal = SERVER_PRINCIPAL_KEY) - @TokenInfo(TestTokenSelector.class) - public interface TestSaslProtocol extends TestRPC.TestProtocol { - public AuthMethod getAuthMethod() throws IOException; - public String getAuthUser() throws IOException; - public String echoPostponed(String value) throws IOException; - public void sendPostponed() throws IOException; - } - - public static class TestSaslImpl extends TestRPC.TestImpl implements - TestSaslProtocol { - private List postponedCalls = new ArrayList(); - @Override - public AuthMethod getAuthMethod() throws IOException { - return UserGroupInformation.getCurrentUser() - .getAuthenticationMethod().getAuthMethod(); - } - @Override - public String getAuthUser() throws IOException { - return UserGroupInformation.getCurrentUser().getUserName(); - } - @Override - public String echoPostponed(String value) { - Call call = Server.getCurCall().get(); - call.postponeResponse(); - postponedCalls.add(call); - return value; - } - @Override - public void sendPostponed() throws IOException { - Collections.shuffle(postponedCalls); - for (Call call : postponedCalls) { - call.sendResponse(); - } - postponedCalls.clear(); - } - } - public static class CustomSecurityInfo extends SecurityInfo { @Override @@ -363,8 +201,8 @@ public String clientPrincipal() { public TokenInfo getTokenInfo(Class protocol, Configuration conf) { return new TokenInfo() { @Override - public Class> value() { + public Class> value() { return TestTokenSelector.class; } @Override @@ -378,10 +216,7 @@ public Class annotationType() { @Test public void testDigestRpc() throws Exception { TestTokenSecretManager sm = new TestTokenSecretManager(); - final Server server = new RPC.Builder(conf) - .setProtocol(TestSaslProtocol.class).setInstance(new TestSaslImpl()) - .setBindAddress(ADDRESS).setPort(0).setNumHandlers(5).setVerbose(true) - .setSecretManager(sm).build(); + final Server server = setupTestServer(conf, 5, sm); doDigestRpc(server, sm); } @@ -391,10 +226,7 @@ public void testDigestRpcWithoutAnnotation() throws Exception { TestTokenSecretManager sm = new TestTokenSecretManager(); try { SecurityUtil.setSecurityInfoProviders(new CustomSecurityInfo()); - final Server server = new RPC.Builder(conf) - .setProtocol(TestSaslProtocol.class).setInstance(new TestSaslImpl()) - .setBindAddress(ADDRESS).setPort(0).setNumHandlers(5) - .setVerbose(true).setSecretManager(sm).build(); + final Server server = setupTestServer(conf, 5, sm); doDigestRpc(server, sm); } finally { SecurityUtil.setSecurityInfoProviders(new SecurityInfo[0]); @@ -404,59 +236,47 @@ public void testDigestRpcWithoutAnnotation() throws Exception { @Test public void testErrorMessage() throws Exception { BadTokenSecretManager sm = new BadTokenSecretManager(); - final Server server = new RPC.Builder(conf) - .setProtocol(TestSaslProtocol.class).setInstance(new TestSaslImpl()) - .setBindAddress(ADDRESS).setPort(0).setNumHandlers(5).setVerbose(true) - .setSecretManager(sm).build(); + final Server server = setupTestServer(conf, 5, sm); boolean succeeded = false; try { doDigestRpc(server, sm); - } catch (RemoteException e) { - LOG.info("LOGGING MESSAGE: " + e.getLocalizedMessage()); - assertEquals(ERROR_MESSAGE, e.getLocalizedMessage()); - assertTrue(e.unwrapRemoteException() instanceof InvalidToken); + } catch (ServiceException e) { + assertTrue(e.getCause() instanceof RemoteException); + RemoteException re = (RemoteException) e.getCause(); + LOG.info("LOGGING MESSAGE: " + re.getLocalizedMessage()); + assertEquals(ERROR_MESSAGE, re.getLocalizedMessage()); + assertTrue(re.unwrapRemoteException() instanceof InvalidToken); succeeded = true; } assertTrue(succeeded); } - private void doDigestRpc(Server server, TestTokenSecretManager sm - ) throws Exception { - server.start(); - + private void doDigestRpc(Server server, TestTokenSecretManager sm) + throws Exception { final UserGroupInformation current = UserGroupInformation.getCurrentUser(); - final InetSocketAddress addr = NetUtils.getConnectAddress(server); + addr = NetUtils.getConnectAddress(server); TestTokenIdentifier tokenId = new TestTokenIdentifier(new Text(current .getUserName())); - Token token = new Token(tokenId, - sm); + Token token = new Token(tokenId, sm); SecurityUtil.setTokenService(token, addr); current.addToken(token); - TestSaslProtocol proxy = null; + TestRpcService proxy = null; try { - proxy = RPC.getProxy(TestSaslProtocol.class, - TestSaslProtocol.versionID, addr, conf); - AuthMethod authMethod = proxy.getAuthMethod(); + proxy = getClient(addr, conf); + AuthMethod authMethod = convert( + proxy.getAuthMethod(null, newEmptyRequest())); assertEquals(TOKEN, authMethod); //QOP must be auth assertEquals(expectedQop.saslQop, RPC.getConnectionIdForProxy(proxy).getSaslQop()); - proxy.ping(); + proxy.ping(null, newEmptyRequest()); } finally { - server.stop(); - if (proxy != null) { - RPC.stopProxy(proxy); - } + stop(server, proxy); } } - static ConnectionId getConnectionId(Configuration conf) throws IOException { - return ConnectionId.getConnectionId(new InetSocketAddress(0), - TestSaslProtocol.class, null, 0, null, conf); - } - @Test public void testPingInterval() throws Exception { Configuration newConf = new Configuration(conf); @@ -466,29 +286,26 @@ public void testPingInterval() throws Exception { // set doPing to true newConf.setBoolean(CommonConfigurationKeys.IPC_CLIENT_PING_KEY, true); - ConnectionId remoteId = getConnectionId(newConf); + ConnectionId remoteId = ConnectionId.getConnectionId(new InetSocketAddress(0), + TestRpcService.class, null, 0, null, newConf); assertEquals(CommonConfigurationKeys.IPC_PING_INTERVAL_DEFAULT, remoteId.getPingInterval()); // set doPing to false newConf.setBoolean(CommonConfigurationKeys.IPC_CLIENT_PING_KEY, false); - remoteId = getConnectionId(newConf); + remoteId = ConnectionId.getConnectionId(new InetSocketAddress(0), + TestRpcService.class, null, 0, null, newConf); assertEquals(0, remoteId.getPingInterval()); } @Test public void testPerConnectionConf() throws Exception { TestTokenSecretManager sm = new TestTokenSecretManager(); - final Server server = new RPC.Builder(conf) - .setProtocol(TestSaslProtocol.class).setInstance(new TestSaslImpl()) - .setBindAddress(ADDRESS).setPort(0).setNumHandlers(5).setVerbose(true) - .setSecretManager(sm).build(); - server.start(); + final Server server = setupTestServer(conf, 5, sm); final UserGroupInformation current = UserGroupInformation.getCurrentUser(); final InetSocketAddress addr = NetUtils.getConnectAddress(server); TestTokenIdentifier tokenId = new TestTokenIdentifier(new Text(current .getUserName())); - Token token = new Token(tokenId, - sm); + Token token = new Token<>(tokenId, sm); SecurityUtil.setTokenService(token, addr); current.addToken(token); @@ -497,28 +314,25 @@ public void testPerConnectionConf() throws Exception { HADOOP_RPC_SOCKET_FACTORY_CLASS_DEFAULT_KEY, ""); Client client = null; - TestSaslProtocol proxy1 = null; - TestSaslProtocol proxy2 = null; - TestSaslProtocol proxy3 = null; + TestRpcService proxy1 = null; + TestRpcService proxy2 = null; + TestRpcService proxy3 = null; int timeouts[] = {111222, 3333333}; try { newConf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY, timeouts[0]); - proxy1 = RPC.getProxy(TestSaslProtocol.class, - TestSaslProtocol.versionID, addr, newConf); - proxy1.getAuthMethod(); - client = WritableRpcEngine.getClient(newConf); + proxy1 = getClient(addr, newConf); + proxy1.getAuthMethod(null, newEmptyRequest()); + client = ProtobufRpcEngine.getClient(newConf); Set conns = client.getConnectionIds(); assertEquals("number of connections in cache is wrong", 1, conns.size()); // same conf, connection should be re-used - proxy2 = RPC.getProxy(TestSaslProtocol.class, - TestSaslProtocol.versionID, addr, newConf); - proxy2.getAuthMethod(); + proxy2 = getClient(addr, newConf); + proxy2.getAuthMethod(null, newEmptyRequest()); assertEquals("number of connections in cache is wrong", 1, conns.size()); // different conf, new connection should be set up newConf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY, timeouts[1]); - proxy3 = RPC.getProxy(TestSaslProtocol.class, - TestSaslProtocol.versionID, addr, newConf); - proxy3.getAuthMethod(); + proxy3 = getClient(addr, newConf); + proxy3.getAuthMethod(null, newEmptyRequest()); assertEquals("number of connections in cache is wrong", 2, conns.size()); // now verify the proxies have the correct connection ids and timeouts ConnectionId[] connsArray = { @@ -551,24 +365,14 @@ static void testKerberosRpc(String principal, String keytab) throws Exception { UserGroupInformation current = UserGroupInformation.getCurrentUser(); System.out.println("UGI: " + current); - Server server = new RPC.Builder(newConf) - .setProtocol(TestSaslProtocol.class).setInstance(new TestSaslImpl()) - .setBindAddress(ADDRESS).setPort(0).setNumHandlers(5).setVerbose(true) - .build(); - TestSaslProtocol proxy = null; + Server server = setupTestServer(newConf, 5); + TestRpcService proxy = null; - server.start(); - - InetSocketAddress addr = NetUtils.getConnectAddress(server); try { - proxy = RPC.getProxy(TestSaslProtocol.class, - TestSaslProtocol.versionID, addr, newConf); - proxy.ping(); + proxy = getClient(addr, newConf); + proxy.ping(null, newEmptyRequest()); } finally { - server.stop(); - if (proxy != null) { - RPC.stopProxy(proxy); - } + stop(server, proxy); } System.out.println("Test is successful."); } @@ -887,14 +691,7 @@ public void testSaslResponseOrdering() throws Exception { UserGroupInformation.setConfiguration(conf); TestTokenSecretManager sm = new TestTokenSecretManager(); - Server server = new RPC.Builder(conf) - .setProtocol(TestSaslProtocol.class) - .setInstance(new TestSaslImpl()).setBindAddress(ADDRESS).setPort(0) - .setNumHandlers(1) // prevents ordering issues when unblocking calls. - .setVerbose(true) - .setSecretManager(sm) - .build(); - server.start(); + Server server = setupTestServer(conf, 1, sm); try { final InetSocketAddress addr = NetUtils.getConnectAddress(server); final UserGroupInformation clientUgi = @@ -903,14 +700,13 @@ public void testSaslResponseOrdering() throws Exception { TestTokenIdentifier tokenId = new TestTokenIdentifier( new Text(clientUgi.getUserName())); - Token token = new Token(tokenId, sm); + Token token = new Token<>(tokenId, sm); SecurityUtil.setTokenService(token, addr); clientUgi.addToken(token); clientUgi.doAs(new PrivilegedExceptionAction() { @Override public Void run() throws Exception { - final TestSaslProtocol proxy = RPC.getProxy(TestSaslProtocol.class, - TestSaslProtocol.versionID, addr, conf); + final TestRpcService proxy = getClient(addr, conf); final ExecutorService executor = Executors.newCachedThreadPool(); final AtomicInteger count = new AtomicInteger(); try { @@ -922,7 +718,8 @@ public Void run() throws Exception { @Override public Void call() throws Exception { String expect = "future"+count.getAndIncrement(); - String answer = proxy.echoPostponed(expect); + String answer = convert(proxy.echoPostponed(null, + newEchoRequest(expect))); assertEquals(expect, answer); return null; } @@ -939,7 +736,7 @@ public Void call() throws Exception { // only 1 handler ensures that the prior calls are already // postponed. 1 handler also ensures that this call will // timeout if the postponing doesn't work (ie. free up handler) - proxy.sendPostponed(); + proxy.sendPostponed(null, newEmptyRequest()); for (int i=0; i < futures.length; i++) { LOG.info("waiting for future"+i); futures[i].get(); @@ -1009,14 +806,7 @@ private String internalGetAuthMethod( Server server = serverUgi.doAs(new PrivilegedExceptionAction() { @Override public Server run() throws IOException { - Server server = new RPC.Builder(serverConf) - .setProtocol(TestSaslProtocol.class) - .setInstance(new TestSaslImpl()).setBindAddress(ADDRESS).setPort(0) - .setNumHandlers(5).setVerbose(true) - .setSecretManager(serverSm) - .build(); - server.start(); - return server; + return setupTestServer(serverConf, 5, serverSm); } }); @@ -1038,17 +828,17 @@ public Server run() throws IOException { Token token = null; switch (tokenType) { case VALID: - token = new Token(tokenId, sm); + token = new Token<>(tokenId, sm); SecurityUtil.setTokenService(token, addr); break; case INVALID: - token = new Token( + token = new Token<>( tokenId.getBytes(), "bad-password!".getBytes(), tokenId.getKind(), null); SecurityUtil.setTokenService(token, addr); break; case OTHER: - token = new Token(); + token = new Token<>(); break; case NONE: // won't get here } @@ -1060,19 +850,28 @@ public Server run() throws IOException { return clientUgi.doAs(new PrivilegedExceptionAction() { @Override public String run() throws IOException { - TestSaslProtocol proxy = null; + TestRpcService proxy = null; try { - proxy = RPC.getProxy(TestSaslProtocol.class, - TestSaslProtocol.versionID, addr, clientConf); - - proxy.ping(); + proxy = getClient(addr, clientConf); + + proxy.ping(null, newEmptyRequest()); // make sure the other side thinks we are who we said we are!!! - assertEquals(clientUgi.getUserName(), proxy.getAuthUser()); - AuthMethod authMethod = proxy.getAuthMethod(); + assertEquals(clientUgi.getUserName(), + convert(proxy.getAuthUser(null, newEmptyRequest()))); + AuthMethod authMethod = + convert(proxy.getAuthMethod(null, newEmptyRequest())); // verify sasl completed with correct QOP assertEquals((authMethod != SIMPLE) ? expectedQop.saslQop : null, - RPC.getConnectionIdForProxy(proxy).getSaslQop()); + RPC.getConnectionIdForProxy(proxy).getSaslQop()); return authMethod.toString(); + } catch (ServiceException se) { + if (se.getCause() instanceof RemoteException) { + throw (RemoteException) se.getCause(); + } else if (se.getCause() instanceof IOException) { + throw (IOException) se.getCause(); + } else { + throw new RuntimeException(se.getCause()); + } } finally { if (proxy != null) { RPC.stopProxy(proxy); diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestDoAsEffectiveUser.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestDoAsEffectiveUser.java index b44fa8b85af..50d389c6465 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestDoAsEffectiveUser.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestDoAsEffectiveUser.java @@ -41,9 +41,9 @@ import org.apache.hadoop.security.token.TokenInfo; import org.junit.Before; import org.junit.Test; -import org.apache.hadoop.ipc.TestSaslRPC.TestTokenSecretManager; -import org.apache.hadoop.ipc.TestSaslRPC.TestTokenIdentifier; -import org.apache.hadoop.ipc.TestSaslRPC.TestTokenSelector; +import org.apache.hadoop.ipc.TestRpcBase.TestTokenSecretManager; +import org.apache.hadoop.ipc.TestRpcBase.TestTokenIdentifier; +import org.apache.hadoop.ipc.TestRpcBase.TestTokenSelector; import org.apache.commons.logging.*; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; diff --git a/hadoop-common-project/hadoop-common/src/test/proto/test.proto b/hadoop-common-project/hadoop-common/src/test/proto/test.proto index ba0038d0d15..99cd93d711c 100644 --- a/hadoop-common-project/hadoop-common/src/test/proto/test.proto +++ b/hadoop-common-project/hadoop-common/src/test/proto/test.proto @@ -81,4 +81,13 @@ message ExchangeRequestProto { message ExchangeResponseProto { repeated int32 values = 1; +} + +message AuthMethodResponseProto { + required int32 code = 1; + required string mechanismName = 2; +} + +message AuthUserResponseProto { + required string authUser = 1; } \ No newline at end of file diff --git a/hadoop-common-project/hadoop-common/src/test/proto/test_rpc_service.proto b/hadoop-common-project/hadoop-common/src/test/proto/test_rpc_service.proto index abb38831e5f..32921158857 100644 --- a/hadoop-common-project/hadoop-common/src/test/proto/test_rpc_service.proto +++ b/hadoop-common-project/hadoop-common/src/test/proto/test_rpc_service.proto @@ -39,6 +39,10 @@ service TestProtobufRpcProto { rpc testServerGet(EmptyRequestProto) returns (EmptyResponseProto); rpc exchange(ExchangeRequestProto) returns (ExchangeResponseProto); rpc sleep(SleepRequestProto) returns (EmptyResponseProto); + rpc getAuthMethod(EmptyRequestProto) returns (AuthMethodResponseProto); + rpc getAuthUser(EmptyRequestProto) returns (AuthUserResponseProto); + rpc echoPostponed(EchoRequestProto) returns (EchoResponseProto); + rpc sendPostponed(EmptyRequestProto) returns (EmptyResponseProto); } service TestProtobufRpc2Proto { diff --git a/hadoop-common-project/hadoop-common/src/test/resources/META-INF/services/org.apache.hadoop.security.token.TokenIdentifier b/hadoop-common-project/hadoop-common/src/test/resources/META-INF/services/org.apache.hadoop.security.token.TokenIdentifier index 56eab0553d2..036b9b4d3ed 100644 --- a/hadoop-common-project/hadoop-common/src/test/resources/META-INF/services/org.apache.hadoop.security.token.TokenIdentifier +++ b/hadoop-common-project/hadoop-common/src/test/resources/META-INF/services/org.apache.hadoop.security.token.TokenIdentifier @@ -11,5 +11,5 @@ # See the License for the specific language governing permissions and # limitations under the License. # -org.apache.hadoop.ipc.TestSaslRPC$TestTokenIdentifier +org.apache.hadoop.ipc.TestRpcBase$TestTokenIdentifier org.apache.hadoop.security.token.delegation.TestDelegationToken$TestDelegationTokenIdentifier