HADOOP-8736. Add Builder for building RPC server. Contributed by Brandon Li

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1379652 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Suresh Srinivas 2012-08-31 22:40:51 +00:00
parent c76a7893f9
commit 4911d9b1ff
11 changed files with 270 additions and 83 deletions

View File

@ -101,6 +101,8 @@ Trunk (unreleased changes)
HADOOP-8619. WritableComparator must implement no-arg constructor. HADOOP-8619. WritableComparator must implement no-arg constructor.
(Chris Douglas via Suresh) (Chris Douglas via Suresh)
HADOOP-8736. Add Builder for building RPC server. (Brandon Li via Suresh)
BUG FIXES BUG FIXES
HADOOP-8177. MBeans shouldn't try to register when it fails to create MBeanName. HADOOP-8177. MBeans shouldn't try to register when it fails to create MBeanName.

View File

@ -55,11 +55,10 @@ public class ZKFCRpcServer implements ZKFCProtocol {
new ZKFCProtocolServerSideTranslatorPB(this); new ZKFCProtocolServerSideTranslatorPB(this);
BlockingService service = ZKFCProtocolService BlockingService service = ZKFCProtocolService
.newReflectiveBlockingService(translator); .newReflectiveBlockingService(translator);
this.server = RPC.getServer( this.server = new RPC.Builder(conf).setProtocol(ZKFCProtocolPB.class)
ZKFCProtocolPB.class, .setInstance(service).setBindAddress(bindAddr.getHostName())
service, bindAddr.getHostName(), .setPort(bindAddr.getPort()).setNumHandlers(HANDLER_COUNT)
bindAddr.getPort(), HANDLER_COUNT, false, conf, .setVerbose(false).build();
null /*secretManager*/);
// set service-level authorization security policy // set service-level authorization security policy
if (conf.getBoolean( if (conf.getBoolean(

View File

@ -713,6 +713,110 @@ public class RPC {
null); null);
} }
/**
* Class to construct instances of RPC server with specific options.
*/
public static class Builder {
private Class<?> protocol = null;
private Object instance = null;
private String bindAddress = "0.0.0.0";
private int port = 0;
private int numHandlers = 1;
private int numReaders = -1;
private int queueSizePerHandler = -1;
private boolean verbose = false;
private final Configuration conf;
private SecretManager<? extends TokenIdentifier> secretManager = null;
private String portRangeConfig = null;
public Builder(Configuration conf) {
this.conf = conf;
}
/** Mandatory field */
public Builder setProtocol(Class<?> protocol) {
this.protocol = protocol;
return this;
}
/** Mandatory field */
public Builder setInstance(Object instance) {
this.instance = instance;
return this;
}
/** Default: 0.0.0.0 */
public Builder setBindAddress(String bindAddress) {
this.bindAddress = bindAddress;
return this;
}
/** Default: 0 */
public Builder setPort(int port) {
this.port = port;
return this;
}
/** Default: 1 */
public Builder setNumHandlers(int numHandlers) {
this.numHandlers = numHandlers;
return this;
}
/** Default: -1 */
public Builder setnumReaders(int numReaders) {
this.numReaders = numReaders;
return this;
}
/** Default: -1 */
public Builder setQueueSizePerHandler(int queueSizePerHandler) {
this.queueSizePerHandler = queueSizePerHandler;
return this;
}
/** Default: false */
public Builder setVerbose(boolean verbose) {
this.verbose = verbose;
return this;
}
/** Default: null */
public Builder setSecretManager(
SecretManager<? extends TokenIdentifier> secretManager) {
this.secretManager = secretManager;
return this;
}
/** Default: null */
public Builder setPortRangeConfig(String portRangeConfig) {
this.portRangeConfig = portRangeConfig;
return this;
}
/**
* Build the RPC Server.
* @throws IOException on error
* @throws HadoopIllegalArgumentException when mandatory fields are not set
*/
public Server build() throws IOException, HadoopIllegalArgumentException {
if (this.conf == null) {
throw new HadoopIllegalArgumentException("conf is not set");
}
if (this.protocol == null) {
throw new HadoopIllegalArgumentException("protocol is not set");
}
if (this.instance == null) {
throw new HadoopIllegalArgumentException("instance is not set");
}
return getProtocolEngine(this.protocol, this.conf).getServer(
this.protocol, this.instance, this.bindAddress, this.port,
this.numHandlers, this.numReaders, this.queueSizePerHandler,
this.verbose, this.conf, this.secretManager, this.portRangeConfig);
}
}
/** An RPC Server. */ /** An RPC Server. */
public abstract static class Server extends org.apache.hadoop.ipc.Server { public abstract static class Server extends org.apache.hadoop.ipc.Server {
boolean verbose; boolean verbose;

View File

@ -165,8 +165,10 @@ public class MiniRPCBenchmark {
new TestDelegationTokenSecretManager(24*60*60*1000, new TestDelegationTokenSecretManager(24*60*60*1000,
7*24*60*60*1000,24*60*60*1000,3600000); 7*24*60*60*1000,24*60*60*1000,3600000);
secretManager.startThreads(); secretManager.startThreads();
rpcServer = RPC.getServer(MiniProtocol.class, rpcServer = new RPC.Builder(conf).setProtocol(MiniProtocol.class)
this, DEFAULT_SERVER_ADDRESS, 0, 1, false, conf, secretManager); .setInstance(this).setBindAddress(DEFAULT_SERVER_ADDRESS).setPort(0)
.setNumHandlers(1).setVerbose(false).setSecretManager(secretManager)
.build();
rpcServer.start(); rpcServer.start();
} }

View File

@ -227,11 +227,14 @@ public class RPCCallBenchmark implements Tool, Configurable {
BlockingService service = TestProtobufRpcProto BlockingService service = TestProtobufRpcProto
.newReflectiveBlockingService(serverImpl); .newReflectiveBlockingService(serverImpl);
server = RPC.getServer(TestRpcService.class, service, server = new RPC.Builder(conf).setProtocol(TestRpcService.class)
opts.host, opts.port, opts.serverThreads, false, conf, null); .setInstance(service).setBindAddress(opts.host).setPort(opts.port)
.setNumHandlers(opts.serverThreads).setVerbose(false).build();
} else if (opts.rpcEngine == WritableRpcEngine.class) { } else if (opts.rpcEngine == WritableRpcEngine.class) {
server = RPC.getServer(TestProtocol.class, new TestRPC.TestImpl(), server = new RPC.Builder(conf).setProtocol(TestProtocol.class)
opts.host, opts.port, opts.serverThreads, false, conf, null); .setInstance(new TestRPC.TestImpl()).setBindAddress(opts.host)
.setPort(opts.port).setNumHandlers(opts.serverThreads)
.setVerbose(false).build();
} else { } else {
throw new RuntimeException("Bad engine: " + opts.rpcEngine); throw new RuntimeException("Bad engine: " + opts.rpcEngine);
} }

View File

@ -175,8 +175,9 @@ public class TestMultipleProtocolServer {
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
// create a server with two handlers // create a server with two handlers
server = RPC.getServer(Foo0.class, server = new RPC.Builder(conf).setProtocol(Foo0.class)
new Foo0Impl(), ADDRESS, 0, 2, false, conf, null); .setInstance(new Foo0Impl()).setBindAddress(ADDRESS).setPort(0)
.setNumHandlers(2).setVerbose(false).build();
server.addProtocol(RPC.RpcKind.RPC_WRITABLE, Foo1.class, new Foo1Impl()); server.addProtocol(RPC.RpcKind.RPC_WRITABLE, Foo1.class, new Foo1Impl());
server.addProtocol(RPC.RpcKind.RPC_WRITABLE, Bar.class, new BarImpl()); server.addProtocol(RPC.RpcKind.RPC_WRITABLE, Bar.class, new BarImpl());
server.addProtocol(RPC.RpcKind.RPC_WRITABLE, Mixin.class, new BarImpl()); server.addProtocol(RPC.RpcKind.RPC_WRITABLE, Mixin.class, new BarImpl());
@ -263,8 +264,9 @@ public class TestMultipleProtocolServer {
@Test(expected=IOException.class) @Test(expected=IOException.class)
public void testIncorrectServerCreation() throws IOException { public void testIncorrectServerCreation() throws IOException {
RPC.getServer(Foo1.class, new RPC.Builder(conf).setProtocol(Foo1.class).setInstance(new Foo0Impl())
new Foo0Impl(), ADDRESS, 0, 2, false, conf, null); .setBindAddress(ADDRESS).setPort(0).setNumHandlers(2).setVerbose(false)
.build();
} }
// Now test a PB service - a server hosts both PB and Writable Rpcs. // Now test a PB service - a server hosts both PB and Writable Rpcs.

View File

@ -113,7 +113,8 @@ public class TestProtoBufRpc {
.newReflectiveBlockingService(serverImpl); .newReflectiveBlockingService(serverImpl);
// Get RPC server for server side implementation // Get RPC server for server side implementation
server = RPC.getServer(TestRpcService.class, service, ADDRESS, PORT, conf); server = new RPC.Builder(conf).setProtocol(TestRpcService.class)
.setInstance(service).setBindAddress(ADDRESS).setPort(PORT).build();
addr = NetUtils.getConnectAddress(server); addr = NetUtils.getConnectAddress(server);
// now the second protocol // now the second protocol

View File

@ -314,8 +314,9 @@ public class TestRPC {
@Test @Test
public void testConfRpc() throws Exception { public void testConfRpc() throws Exception {
Server server = RPC.getServer(TestProtocol.class, Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class)
new TestImpl(), ADDRESS, 0, 1, false, conf, null); .setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0)
.setNumHandlers(1).setVerbose(false).build();
// Just one handler // Just one handler
int confQ = conf.getInt( int confQ = conf.getInt(
CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_KEY, CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_KEY,
@ -328,8 +329,11 @@ public class TestRPC {
assertEquals(confReaders, server.getNumReaders()); assertEquals(confReaders, server.getNumReaders());
server.stop(); server.stop();
server = RPC.getServer(TestProtocol.class, server = new RPC.Builder(conf).setProtocol(TestProtocol.class)
new TestImpl(), ADDRESS, 0, 1, 3, 200, false, conf, null); .setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0)
.setNumHandlers(1).setnumReaders(3).setQueueSizePerHandler(200)
.setVerbose(false).build();
assertEquals(3, server.getNumReaders()); assertEquals(3, server.getNumReaders());
assertEquals(200, server.getMaxQueueSize()); assertEquals(200, server.getMaxQueueSize());
server.stop(); server.stop();
@ -337,8 +341,8 @@ public class TestRPC {
@Test @Test
public void testProxyAddress() throws Exception { public void testProxyAddress() throws Exception {
Server server = RPC.getServer(TestProtocol.class, Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class)
new TestImpl(), ADDRESS, 0, conf); .setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0).build();
TestProtocol proxy = null; TestProtocol proxy = null;
try { try {
@ -362,8 +366,10 @@ public class TestRPC {
public void testSlowRpc() throws Exception { public void testSlowRpc() throws Exception {
System.out.println("Testing Slow RPC"); System.out.println("Testing Slow RPC");
// create a server with two handlers // create a server with two handlers
Server server = RPC.getServer(TestProtocol.class, Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class)
new TestImpl(), ADDRESS, 0, 2, false, conf, null); .setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0)
.setNumHandlers(2).setVerbose(false).build();
TestProtocol proxy = null; TestProtocol proxy = null;
try { try {
@ -409,8 +415,8 @@ public class TestRPC {
} }
private void testCallsInternal(Configuration conf) throws Exception { private void testCallsInternal(Configuration conf) throws Exception {
Server server = RPC.getServer(TestProtocol.class, Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class)
new TestImpl(), ADDRESS, 0, conf); .setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0).build();
TestProtocol proxy = null; TestProtocol proxy = null;
try { try {
server.start(); server.start();
@ -528,8 +534,9 @@ public class TestRPC {
} }
private void doRPCs(Configuration conf, boolean expectFailure) throws Exception { private void doRPCs(Configuration conf, boolean expectFailure) throws Exception {
Server server = RPC.getServer(TestProtocol.class, Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class)
new TestImpl(), ADDRESS, 0, 5, true, conf, null); .setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0)
.setNumHandlers(5).setVerbose(true).build();
server.refreshServiceAcl(conf, new TestPolicyProvider()); server.refreshServiceAcl(conf, new TestPolicyProvider());
@ -573,8 +580,9 @@ public class TestRPC {
@Test @Test
public void testServerAddress() throws IOException { public void testServerAddress() throws IOException {
Server server = RPC.getServer(TestProtocol.class, Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class)
new TestImpl(), ADDRESS, 0, 5, true, conf, null); .setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0)
.setNumHandlers(5).setVerbose(true).build();
InetSocketAddress bindAddr = null; InetSocketAddress bindAddr = null;
try { try {
bindAddr = NetUtils.getConnectAddress(server); bindAddr = NetUtils.getConnectAddress(server);
@ -668,8 +676,9 @@ public class TestRPC {
@Test @Test
public void testErrorMsgForInsecureClient() throws Exception { public void testErrorMsgForInsecureClient() throws Exception {
final Server server = RPC.getServer(TestProtocol.class, final Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class)
new TestImpl(), ADDRESS, 0, 5, true, conf, null); .setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0)
.setNumHandlers(5).setVerbose(true).build();
server.enableSecurity(); server.enableSecurity();
server.start(); server.start();
boolean succeeded = false; boolean succeeded = false;
@ -693,8 +702,10 @@ public class TestRPC {
conf.setInt(CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_KEY, 2); conf.setInt(CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_KEY, 2);
final Server multiServer = RPC.getServer(TestProtocol.class, final Server multiServer = new RPC.Builder(conf)
new TestImpl(), ADDRESS, 0, 5, true, conf, null); .setProtocol(TestProtocol.class).setInstance(new TestImpl())
.setBindAddress(ADDRESS).setPort(0).setNumHandlers(5).setVerbose(true)
.build();
multiServer.enableSecurity(); multiServer.enableSecurity();
multiServer.start(); multiServer.start();
succeeded = false; succeeded = false;
@ -748,8 +759,9 @@ public class TestRPC {
assertEquals("Expect no Reader threads running before test", assertEquals("Expect no Reader threads running before test",
0, threadsBefore); 0, threadsBefore);
final Server server = RPC.getServer(TestProtocol.class, final Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class)
new TestImpl(), ADDRESS, 0, 5, true, conf, null); .setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0)
.setNumHandlers(5).setVerbose(true).build();
server.start(); server.start();
try { try {
int threadsRunning = countThreads("Server$Listener$Reader"); int threadsRunning = countThreads("Server$Listener$Reader");
@ -762,6 +774,42 @@ public class TestRPC {
0, threadsAfter); 0, threadsAfter);
} }
@Test
public void testRPCBuilder() throws Exception {
// Test mandatory field conf
try {
new RPC.Builder(null).setProtocol(TestProtocol.class)
.setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0)
.setNumHandlers(5).setVerbose(true).build();
fail("Didn't throw HadoopIllegalArgumentException");
} catch (Exception e) {
if (!(e instanceof HadoopIllegalArgumentException)) {
fail("Expecting HadoopIllegalArgumentException but caught " + e);
}
}
// Test mandatory field protocol
try {
new RPC.Builder(conf).setInstance(new TestImpl()).setBindAddress(ADDRESS)
.setPort(0).setNumHandlers(5).setVerbose(true).build();
fail("Didn't throw HadoopIllegalArgumentException");
} catch (Exception e) {
if (!(e instanceof HadoopIllegalArgumentException)) {
fail("Expecting HadoopIllegalArgumentException but caught " + e);
}
}
// Test mandatory field instance
try {
new RPC.Builder(conf).setProtocol(TestProtocol.class)
.setBindAddress(ADDRESS).setPort(0).setNumHandlers(5)
.setVerbose(true).build();
fail("Didn't throw HadoopIllegalArgumentException");
} catch (Exception e) {
if (!(e instanceof HadoopIllegalArgumentException)) {
fail("Expecting HadoopIllegalArgumentException but caught " + e);
}
}
}
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
new TestRPC().testCallsInternal(conf); new TestRPC().testCallsInternal(conf);

View File

@ -131,8 +131,9 @@ public class TestRPCCompatibility {
public void testVersion0ClientVersion1Server() throws Exception { public void testVersion0ClientVersion1Server() throws Exception {
// create a server with two handlers // create a server with two handlers
TestImpl1 impl = new TestImpl1(); TestImpl1 impl = new TestImpl1();
server = RPC.getServer(TestProtocol1.class, server = new RPC.Builder(conf).setProtocol(TestProtocol1.class)
impl, ADDRESS, 0, 2, false, conf, null); .setInstance(impl).setBindAddress(ADDRESS).setPort(0).setNumHandlers(2)
.setVerbose(false).build();
server.addProtocol(RPC.RpcKind.RPC_WRITABLE, TestProtocol0.class, impl); server.addProtocol(RPC.RpcKind.RPC_WRITABLE, TestProtocol0.class, impl);
server.start(); server.start();
addr = NetUtils.getConnectAddress(server); addr = NetUtils.getConnectAddress(server);
@ -147,8 +148,9 @@ public class TestRPCCompatibility {
@Test // old client vs new server @Test // old client vs new server
public void testVersion1ClientVersion0Server() throws Exception { public void testVersion1ClientVersion0Server() throws Exception {
// create a server with two handlers // create a server with two handlers
server = RPC.getServer(TestProtocol0.class, server = new RPC.Builder(conf).setProtocol(TestProtocol0.class)
new TestImpl0(), ADDRESS, 0, 2, false, conf, null); .setInstance(new TestImpl0()).setBindAddress(ADDRESS).setPort(0)
.setNumHandlers(2).setVerbose(false).build();
server.start(); server.start();
addr = NetUtils.getConnectAddress(server); addr = NetUtils.getConnectAddress(server);
@ -198,8 +200,9 @@ System.out.println("echo int is NOT supported");
public void testVersion2ClientVersion1Server() throws Exception { public void testVersion2ClientVersion1Server() throws Exception {
// create a server with two handlers // create a server with two handlers
TestImpl1 impl = new TestImpl1(); TestImpl1 impl = new TestImpl1();
server = RPC.getServer(TestProtocol1.class, server = new RPC.Builder(conf).setProtocol(TestProtocol1.class)
impl, ADDRESS, 0, 2, false, conf, null); .setInstance(impl).setBindAddress(ADDRESS).setPort(0).setNumHandlers(2)
.setVerbose(false).build();
server.addProtocol(RPC.RpcKind.RPC_WRITABLE, TestProtocol0.class, impl); server.addProtocol(RPC.RpcKind.RPC_WRITABLE, TestProtocol0.class, impl);
server.start(); server.start();
addr = NetUtils.getConnectAddress(server); addr = NetUtils.getConnectAddress(server);
@ -219,8 +222,9 @@ System.out.println("echo int is NOT supported");
ProtocolSignature.resetCache(); ProtocolSignature.resetCache();
// create a server with two handlers // create a server with two handlers
TestImpl2 impl = new TestImpl2(); TestImpl2 impl = new TestImpl2();
server = RPC.getServer(TestProtocol2.class, server = new RPC.Builder(conf).setProtocol(TestProtocol2.class)
impl, ADDRESS, 0, 2, false, conf, null); .setInstance(impl).setBindAddress(ADDRESS).setPort(0).setNumHandlers(2)
.setVerbose(false).build();
server.addProtocol(RPC.RpcKind.RPC_WRITABLE, TestProtocol0.class, impl); server.addProtocol(RPC.RpcKind.RPC_WRITABLE, TestProtocol0.class, impl);
server.start(); server.start();
addr = NetUtils.getConnectAddress(server); addr = NetUtils.getConnectAddress(server);
@ -290,8 +294,9 @@ System.out.println("echo int is NOT supported");
@Test @Test
public void testVersionMismatch() throws IOException { public void testVersionMismatch() throws IOException {
server = RPC.getServer(TestProtocol2.class, new TestImpl2(), ADDRESS, 0, 2, server = new RPC.Builder(conf).setProtocol(TestProtocol2.class)
false, conf, null); .setInstance(new TestImpl2()).setBindAddress(ADDRESS).setPort(0)
.setNumHandlers(2).setVerbose(false).build();
server.start(); server.start();
addr = NetUtils.getConnectAddress(server); addr = NetUtils.getConnectAddress(server);
@ -308,8 +313,9 @@ System.out.println("echo int is NOT supported");
@Test @Test
public void testIsMethodSupported() throws IOException { public void testIsMethodSupported() throws IOException {
server = RPC.getServer(TestProtocol2.class, new TestImpl2(), ADDRESS, 0, 2, server = new RPC.Builder(conf).setProtocol(TestProtocol2.class)
false, conf, null); .setInstance(new TestImpl2()).setBindAddress(ADDRESS).setPort(0)
.setNumHandlers(2).setVerbose(false).build();
server.start(); server.start();
addr = NetUtils.getConnectAddress(server); addr = NetUtils.getConnectAddress(server);
@ -332,8 +338,9 @@ System.out.println("echo int is NOT supported");
@Test @Test
public void testProtocolMetaInfoSSTranslatorPB() throws Exception { public void testProtocolMetaInfoSSTranslatorPB() throws Exception {
TestImpl1 impl = new TestImpl1(); TestImpl1 impl = new TestImpl1();
server = RPC.getServer(TestProtocol1.class, impl, ADDRESS, 0, 2, false, server = new RPC.Builder(conf).setProtocol(TestProtocol1.class)
conf, null); .setInstance(impl).setBindAddress(ADDRESS).setPort(0).setNumHandlers(2)
.setVerbose(false).build();
server.addProtocol(RPC.RpcKind.RPC_WRITABLE, TestProtocol0.class, impl); server.addProtocol(RPC.RpcKind.RPC_WRITABLE, TestProtocol0.class, impl);
server.start(); server.start();

View File

@ -235,8 +235,10 @@ public class TestSaslRPC {
@Test @Test
public void testDigestRpc() throws Exception { public void testDigestRpc() throws Exception {
TestTokenSecretManager sm = new TestTokenSecretManager(); TestTokenSecretManager sm = new TestTokenSecretManager();
final Server server = RPC.getServer(TestSaslProtocol.class, final Server server = new RPC.Builder(conf)
new TestSaslImpl(), ADDRESS, 0, 5, true, conf, sm); .setProtocol(TestSaslProtocol.class).setInstance(new TestSaslImpl())
.setBindAddress(ADDRESS).setPort(0).setNumHandlers(5).setVerbose(true)
.setSecretManager(sm).build();
doDigestRpc(server, sm); doDigestRpc(server, sm);
} }
@ -246,9 +248,10 @@ public class TestSaslRPC {
TestTokenSecretManager sm = new TestTokenSecretManager(); TestTokenSecretManager sm = new TestTokenSecretManager();
try { try {
SecurityUtil.setSecurityInfoProviders(new CustomSecurityInfo()); SecurityUtil.setSecurityInfoProviders(new CustomSecurityInfo());
final Server server = RPC.getServer(TestSaslProtocol.class, final Server server = new RPC.Builder(conf)
new TestSaslImpl(), ADDRESS, 0, 5, .setProtocol(TestSaslProtocol.class).setInstance(new TestSaslImpl())
true, conf, sm); .setBindAddress(ADDRESS).setPort(0).setNumHandlers(5)
.setVerbose(true).setSecretManager(sm).build();
doDigestRpc(server, sm); doDigestRpc(server, sm);
} finally { } finally {
SecurityUtil.setSecurityInfoProviders(new SecurityInfo[0]); SecurityUtil.setSecurityInfoProviders(new SecurityInfo[0]);
@ -257,8 +260,9 @@ public class TestSaslRPC {
@Test @Test
public void testSecureToInsecureRpc() throws Exception { public void testSecureToInsecureRpc() throws Exception {
Server server = RPC.getServer(TestSaslProtocol.class, Server server = new RPC.Builder(conf).setProtocol(TestSaslProtocol.class)
new TestSaslImpl(), ADDRESS, 0, 5, true, conf, null); .setInstance(new TestSaslImpl()).setBindAddress(ADDRESS).setPort(0)
.setNumHandlers(5).setVerbose(true).build();
server.disableSecurity(); server.disableSecurity();
TestTokenSecretManager sm = new TestTokenSecretManager(); TestTokenSecretManager sm = new TestTokenSecretManager();
doDigestRpc(server, sm); doDigestRpc(server, sm);
@ -267,8 +271,10 @@ public class TestSaslRPC {
@Test @Test
public void testErrorMessage() throws Exception { public void testErrorMessage() throws Exception {
BadTokenSecretManager sm = new BadTokenSecretManager(); BadTokenSecretManager sm = new BadTokenSecretManager();
final Server server = RPC.getServer(TestSaslProtocol.class, final Server server = new RPC.Builder(conf)
new TestSaslImpl(), ADDRESS, 0, 5, true, conf, sm); .setProtocol(TestSaslProtocol.class).setInstance(new TestSaslImpl())
.setBindAddress(ADDRESS).setPort(0).setNumHandlers(5).setVerbose(true)
.setSecretManager(sm).build();
boolean succeeded = false; boolean succeeded = false;
try { try {
@ -355,8 +361,10 @@ public class TestSaslRPC {
@Test @Test
public void testPerConnectionConf() throws Exception { public void testPerConnectionConf() throws Exception {
TestTokenSecretManager sm = new TestTokenSecretManager(); TestTokenSecretManager sm = new TestTokenSecretManager();
final Server server = RPC.getServer(TestSaslProtocol.class, final Server server = new RPC.Builder(conf)
new TestSaslImpl(), ADDRESS, 0, 5, true, conf, sm); .setProtocol(TestSaslProtocol.class).setInstance(new TestSaslImpl())
.setBindAddress(ADDRESS).setPort(0).setNumHandlers(5).setVerbose(true)
.setSecretManager(sm).build();
server.start(); server.start();
final UserGroupInformation current = UserGroupInformation.getCurrentUser(); final UserGroupInformation current = UserGroupInformation.getCurrentUser();
final InetSocketAddress addr = NetUtils.getConnectAddress(server); final InetSocketAddress addr = NetUtils.getConnectAddress(server);
@ -418,8 +426,10 @@ public class TestSaslRPC {
UserGroupInformation current = UserGroupInformation.getCurrentUser(); UserGroupInformation current = UserGroupInformation.getCurrentUser();
System.out.println("UGI: " + current); System.out.println("UGI: " + current);
Server server = RPC.getServer(TestSaslProtocol.class, new TestSaslImpl(), Server server = new RPC.Builder(newConf)
ADDRESS, 0, 5, true, newConf, null); .setProtocol(TestSaslProtocol.class).setInstance(new TestSaslImpl())
.setBindAddress(ADDRESS).setPort(0).setNumHandlers(5).setVerbose(true)
.build();
TestSaslProtocol proxy = null; TestSaslProtocol proxy = null;
server.start(); server.start();
@ -441,8 +451,9 @@ public class TestSaslRPC {
@Test @Test
public void testDigestAuthMethod() throws Exception { public void testDigestAuthMethod() throws Exception {
TestTokenSecretManager sm = new TestTokenSecretManager(); TestTokenSecretManager sm = new TestTokenSecretManager();
Server server = RPC.getServer(TestSaslProtocol.class, Server server = new RPC.Builder(conf).setProtocol(TestSaslProtocol.class)
new TestSaslImpl(), ADDRESS, 0, 5, true, conf, sm); .setInstance(new TestSaslImpl()).setBindAddress(ADDRESS).setPort(0)
.setNumHandlers(5).setVerbose(true).setSecretManager(sm).build();
server.start(); server.start();
final UserGroupInformation current = UserGroupInformation.getCurrentUser(); final UserGroupInformation current = UserGroupInformation.getCurrentUser();

View File

@ -155,8 +155,9 @@ public class TestDoAsEffectiveUser {
conf.setStrings(ProxyUsers conf.setStrings(ProxyUsers
.getProxySuperuserGroupConfKey(REAL_USER_SHORT_NAME), "group1"); .getProxySuperuserGroupConfKey(REAL_USER_SHORT_NAME), "group1");
configureSuperUserIPAddresses(conf, REAL_USER_SHORT_NAME); configureSuperUserIPAddresses(conf, REAL_USER_SHORT_NAME);
Server server = RPC.getServer(TestProtocol.class, new TestImpl(), ADDRESS, Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class)
0, 5, true, conf, null); .setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0)
.setNumHandlers(5).setVerbose(true).build();
refreshConf(conf); refreshConf(conf);
try { try {
@ -197,8 +198,9 @@ public class TestDoAsEffectiveUser {
configureSuperUserIPAddresses(conf, REAL_USER_SHORT_NAME); configureSuperUserIPAddresses(conf, REAL_USER_SHORT_NAME);
conf.setStrings(ProxyUsers.getProxySuperuserGroupConfKey(REAL_USER_SHORT_NAME), conf.setStrings(ProxyUsers.getProxySuperuserGroupConfKey(REAL_USER_SHORT_NAME),
"group1"); "group1");
Server server = RPC.getServer(TestProtocol.class, new TestImpl(), ADDRESS, Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class)
0, 2, false, conf, null); .setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0)
.setNumHandlers(2).setVerbose(false).build();
refreshConf(conf); refreshConf(conf);
try { try {
@ -244,8 +246,9 @@ public class TestDoAsEffectiveUser {
"20.20.20.20"); //Authorized IP address "20.20.20.20"); //Authorized IP address
conf.setStrings(ProxyUsers.getProxySuperuserGroupConfKey(REAL_USER_SHORT_NAME), conf.setStrings(ProxyUsers.getProxySuperuserGroupConfKey(REAL_USER_SHORT_NAME),
"group1"); "group1");
Server server = RPC.getServer(TestProtocol.class, new TestImpl(), ADDRESS, Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class)
0, 2, false, conf, null); .setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0)
.setNumHandlers(2).setVerbose(false).build();
refreshConf(conf); refreshConf(conf);
@ -286,8 +289,9 @@ public class TestDoAsEffectiveUser {
final Configuration conf = new Configuration(); final Configuration conf = new Configuration();
conf.setStrings(ProxyUsers conf.setStrings(ProxyUsers
.getProxySuperuserGroupConfKey(REAL_USER_SHORT_NAME), "group1"); .getProxySuperuserGroupConfKey(REAL_USER_SHORT_NAME), "group1");
Server server = RPC.getServer(TestProtocol.class, new TestImpl(), ADDRESS, Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class)
0, 2, false, conf, null); .setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0)
.setNumHandlers(2).setVerbose(false).build();
try { try {
server.start(); server.start();
@ -325,8 +329,9 @@ public class TestDoAsEffectiveUser {
public void testRealUserGroupNotSpecified() throws IOException { public void testRealUserGroupNotSpecified() throws IOException {
final Configuration conf = new Configuration(); final Configuration conf = new Configuration();
configureSuperUserIPAddresses(conf, REAL_USER_SHORT_NAME); configureSuperUserIPAddresses(conf, REAL_USER_SHORT_NAME);
Server server = RPC.getServer(TestProtocol.class, new TestImpl(), ADDRESS, Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class)
0, 2, false, conf, null); .setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0)
.setNumHandlers(2).setVerbose(false).build();
try { try {
server.start(); server.start();
@ -366,9 +371,9 @@ public class TestDoAsEffectiveUser {
configureSuperUserIPAddresses(conf, REAL_USER_SHORT_NAME); configureSuperUserIPAddresses(conf, REAL_USER_SHORT_NAME);
conf.setStrings(ProxyUsers.getProxySuperuserGroupConfKey(REAL_USER_SHORT_NAME), conf.setStrings(ProxyUsers.getProxySuperuserGroupConfKey(REAL_USER_SHORT_NAME),
"group3"); "group3");
Server server = RPC.getServer(TestProtocol.class, new TestImpl(), ADDRESS, Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class)
0, 2, false, conf, null); .setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0)
.setNumHandlers(2).setVerbose(false).build();
try { try {
server.start(); server.start();
@ -414,8 +419,9 @@ public class TestDoAsEffectiveUser {
conf conf
.set(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION, "kerberos"); .set(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
UserGroupInformation.setConfiguration(conf); UserGroupInformation.setConfiguration(conf);
final Server server = RPC.getServer(TestProtocol.class, new TestImpl(), final Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class)
ADDRESS, 0, 5, true, conf, sm); .setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0)
.setNumHandlers(5).setVerbose(true).setSecretManager(sm).build();
server.start(); server.start();
@ -468,8 +474,10 @@ public class TestDoAsEffectiveUser {
newConf.set(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION, newConf.set(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION,
"kerberos"); "kerberos");
UserGroupInformation.setConfiguration(newConf); UserGroupInformation.setConfiguration(newConf);
final Server server = RPC.getServer(TestProtocol.class, new TestImpl(), final Server server = new RPC.Builder(newConf)
ADDRESS, 0, 5, true, newConf, sm); .setProtocol(TestProtocol.class).setInstance(new TestImpl())
.setBindAddress(ADDRESS).setPort(0).setNumHandlers(5).setVerbose(true)
.setSecretManager(sm).build();
server.start(); server.start();