HADOOP-8736. Merging change r1379652 from trunk to branch-2. Contributed by Brandon Li.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1388575 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
72ce7d1945
commit
59d968e642
|
@ -101,8 +101,6 @@ Trunk (Unreleased)
|
||||||
HADOOP-8619. WritableComparator must implement no-arg constructor.
|
HADOOP-8619. WritableComparator must implement no-arg constructor.
|
||||||
(Chris Douglas via Suresh)
|
(Chris Douglas via Suresh)
|
||||||
|
|
||||||
HADOOP-8736. Add Builder for building RPC server. (Brandon Li via Suresh)
|
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
|
||||||
HADOOP-8177. MBeans shouldn't try to register when it fails to create MBeanName.
|
HADOOP-8177. MBeans shouldn't try to register when it fails to create MBeanName.
|
||||||
|
@ -374,6 +372,8 @@ Release 2.0.2-alpha - 2012-09-07
|
||||||
HADOOP-8819. Incorrectly & is used instead of && in some file system
|
HADOOP-8819. Incorrectly & is used instead of && in some file system
|
||||||
implementations. (Brandon Li via suresh)
|
implementations. (Brandon Li via suresh)
|
||||||
|
|
||||||
|
HADOOP-8736. Add Builder for building RPC server. (Brandon Li via Suresh)
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
|
||||||
HADOOP-8372. NetUtils.normalizeHostName() incorrectly handles hostname
|
HADOOP-8372. NetUtils.normalizeHostName() incorrectly handles hostname
|
||||||
|
|
|
@ -55,11 +55,10 @@ public class ZKFCRpcServer implements ZKFCProtocol {
|
||||||
new ZKFCProtocolServerSideTranslatorPB(this);
|
new ZKFCProtocolServerSideTranslatorPB(this);
|
||||||
BlockingService service = ZKFCProtocolService
|
BlockingService service = ZKFCProtocolService
|
||||||
.newReflectiveBlockingService(translator);
|
.newReflectiveBlockingService(translator);
|
||||||
this.server = RPC.getServer(
|
this.server = new RPC.Builder(conf).setProtocol(ZKFCProtocolPB.class)
|
||||||
ZKFCProtocolPB.class,
|
.setInstance(service).setBindAddress(bindAddr.getHostName())
|
||||||
service, bindAddr.getHostName(),
|
.setPort(bindAddr.getPort()).setNumHandlers(HANDLER_COUNT)
|
||||||
bindAddr.getPort(), HANDLER_COUNT, false, conf,
|
.setVerbose(false).build();
|
||||||
null /*secretManager*/);
|
|
||||||
|
|
||||||
// set service-level authorization security policy
|
// set service-level authorization security policy
|
||||||
if (conf.getBoolean(
|
if (conf.getBoolean(
|
||||||
|
|
|
@ -725,6 +725,110 @@ public class RPC {
|
||||||
null);
|
null);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Class to construct instances of RPC server with specific options.
|
||||||
|
*/
|
||||||
|
public static class Builder {
|
||||||
|
private Class<?> protocol = null;
|
||||||
|
private Object instance = null;
|
||||||
|
private String bindAddress = "0.0.0.0";
|
||||||
|
private int port = 0;
|
||||||
|
private int numHandlers = 1;
|
||||||
|
private int numReaders = -1;
|
||||||
|
private int queueSizePerHandler = -1;
|
||||||
|
private boolean verbose = false;
|
||||||
|
private final Configuration conf;
|
||||||
|
private SecretManager<? extends TokenIdentifier> secretManager = null;
|
||||||
|
private String portRangeConfig = null;
|
||||||
|
|
||||||
|
public Builder(Configuration conf) {
|
||||||
|
this.conf = conf;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Mandatory field */
|
||||||
|
public Builder setProtocol(Class<?> protocol) {
|
||||||
|
this.protocol = protocol;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Mandatory field */
|
||||||
|
public Builder setInstance(Object instance) {
|
||||||
|
this.instance = instance;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Default: 0.0.0.0 */
|
||||||
|
public Builder setBindAddress(String bindAddress) {
|
||||||
|
this.bindAddress = bindAddress;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Default: 0 */
|
||||||
|
public Builder setPort(int port) {
|
||||||
|
this.port = port;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Default: 1 */
|
||||||
|
public Builder setNumHandlers(int numHandlers) {
|
||||||
|
this.numHandlers = numHandlers;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Default: -1 */
|
||||||
|
public Builder setnumReaders(int numReaders) {
|
||||||
|
this.numReaders = numReaders;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Default: -1 */
|
||||||
|
public Builder setQueueSizePerHandler(int queueSizePerHandler) {
|
||||||
|
this.queueSizePerHandler = queueSizePerHandler;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Default: false */
|
||||||
|
public Builder setVerbose(boolean verbose) {
|
||||||
|
this.verbose = verbose;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Default: null */
|
||||||
|
public Builder setSecretManager(
|
||||||
|
SecretManager<? extends TokenIdentifier> secretManager) {
|
||||||
|
this.secretManager = secretManager;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Default: null */
|
||||||
|
public Builder setPortRangeConfig(String portRangeConfig) {
|
||||||
|
this.portRangeConfig = portRangeConfig;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Build the RPC Server.
|
||||||
|
* @throws IOException on error
|
||||||
|
* @throws HadoopIllegalArgumentException when mandatory fields are not set
|
||||||
|
*/
|
||||||
|
public Server build() throws IOException, HadoopIllegalArgumentException {
|
||||||
|
if (this.conf == null) {
|
||||||
|
throw new HadoopIllegalArgumentException("conf is not set");
|
||||||
|
}
|
||||||
|
if (this.protocol == null) {
|
||||||
|
throw new HadoopIllegalArgumentException("protocol is not set");
|
||||||
|
}
|
||||||
|
if (this.instance == null) {
|
||||||
|
throw new HadoopIllegalArgumentException("instance is not set");
|
||||||
|
}
|
||||||
|
|
||||||
|
return getProtocolEngine(this.protocol, this.conf).getServer(
|
||||||
|
this.protocol, this.instance, this.bindAddress, this.port,
|
||||||
|
this.numHandlers, this.numReaders, this.queueSizePerHandler,
|
||||||
|
this.verbose, this.conf, this.secretManager, this.portRangeConfig);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/** An RPC Server. */
|
/** An RPC Server. */
|
||||||
public abstract static class Server extends org.apache.hadoop.ipc.Server {
|
public abstract static class Server extends org.apache.hadoop.ipc.Server {
|
||||||
boolean verbose;
|
boolean verbose;
|
||||||
|
|
|
@ -165,8 +165,10 @@ public class MiniRPCBenchmark {
|
||||||
new TestDelegationTokenSecretManager(24*60*60*1000,
|
new TestDelegationTokenSecretManager(24*60*60*1000,
|
||||||
7*24*60*60*1000,24*60*60*1000,3600000);
|
7*24*60*60*1000,24*60*60*1000,3600000);
|
||||||
secretManager.startThreads();
|
secretManager.startThreads();
|
||||||
rpcServer = RPC.getServer(MiniProtocol.class,
|
rpcServer = new RPC.Builder(conf).setProtocol(MiniProtocol.class)
|
||||||
this, DEFAULT_SERVER_ADDRESS, 0, 1, false, conf, secretManager);
|
.setInstance(this).setBindAddress(DEFAULT_SERVER_ADDRESS).setPort(0)
|
||||||
|
.setNumHandlers(1).setVerbose(false).setSecretManager(secretManager)
|
||||||
|
.build();
|
||||||
rpcServer.start();
|
rpcServer.start();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -227,11 +227,14 @@ public class RPCCallBenchmark implements Tool, Configurable {
|
||||||
BlockingService service = TestProtobufRpcProto
|
BlockingService service = TestProtobufRpcProto
|
||||||
.newReflectiveBlockingService(serverImpl);
|
.newReflectiveBlockingService(serverImpl);
|
||||||
|
|
||||||
server = RPC.getServer(TestRpcService.class, service,
|
server = new RPC.Builder(conf).setProtocol(TestRpcService.class)
|
||||||
opts.host, opts.port, opts.serverThreads, false, conf, null);
|
.setInstance(service).setBindAddress(opts.host).setPort(opts.port)
|
||||||
|
.setNumHandlers(opts.serverThreads).setVerbose(false).build();
|
||||||
} else if (opts.rpcEngine == WritableRpcEngine.class) {
|
} else if (opts.rpcEngine == WritableRpcEngine.class) {
|
||||||
server = RPC.getServer(TestProtocol.class, new TestRPC.TestImpl(),
|
server = new RPC.Builder(conf).setProtocol(TestProtocol.class)
|
||||||
opts.host, opts.port, opts.serverThreads, false, conf, null);
|
.setInstance(new TestRPC.TestImpl()).setBindAddress(opts.host)
|
||||||
|
.setPort(opts.port).setNumHandlers(opts.serverThreads)
|
||||||
|
.setVerbose(false).build();
|
||||||
} else {
|
} else {
|
||||||
throw new RuntimeException("Bad engine: " + opts.rpcEngine);
|
throw new RuntimeException("Bad engine: " + opts.rpcEngine);
|
||||||
}
|
}
|
||||||
|
|
|
@ -175,8 +175,9 @@ public class TestMultipleProtocolServer {
|
||||||
@Before
|
@Before
|
||||||
public void setUp() throws Exception {
|
public void setUp() throws Exception {
|
||||||
// create a server with two handlers
|
// create a server with two handlers
|
||||||
server = RPC.getServer(Foo0.class,
|
server = new RPC.Builder(conf).setProtocol(Foo0.class)
|
||||||
new Foo0Impl(), ADDRESS, 0, 2, false, conf, null);
|
.setInstance(new Foo0Impl()).setBindAddress(ADDRESS).setPort(0)
|
||||||
|
.setNumHandlers(2).setVerbose(false).build();
|
||||||
server.addProtocol(RPC.RpcKind.RPC_WRITABLE, Foo1.class, new Foo1Impl());
|
server.addProtocol(RPC.RpcKind.RPC_WRITABLE, Foo1.class, new Foo1Impl());
|
||||||
server.addProtocol(RPC.RpcKind.RPC_WRITABLE, Bar.class, new BarImpl());
|
server.addProtocol(RPC.RpcKind.RPC_WRITABLE, Bar.class, new BarImpl());
|
||||||
server.addProtocol(RPC.RpcKind.RPC_WRITABLE, Mixin.class, new BarImpl());
|
server.addProtocol(RPC.RpcKind.RPC_WRITABLE, Mixin.class, new BarImpl());
|
||||||
|
@ -263,8 +264,9 @@ public class TestMultipleProtocolServer {
|
||||||
|
|
||||||
@Test(expected=IOException.class)
|
@Test(expected=IOException.class)
|
||||||
public void testIncorrectServerCreation() throws IOException {
|
public void testIncorrectServerCreation() throws IOException {
|
||||||
RPC.getServer(Foo1.class,
|
new RPC.Builder(conf).setProtocol(Foo1.class).setInstance(new Foo0Impl())
|
||||||
new Foo0Impl(), ADDRESS, 0, 2, false, conf, null);
|
.setBindAddress(ADDRESS).setPort(0).setNumHandlers(2).setVerbose(false)
|
||||||
|
.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
// Now test a PB service - a server hosts both PB and Writable Rpcs.
|
// Now test a PB service - a server hosts both PB and Writable Rpcs.
|
||||||
|
|
|
@ -113,7 +113,8 @@ public class TestProtoBufRpc {
|
||||||
.newReflectiveBlockingService(serverImpl);
|
.newReflectiveBlockingService(serverImpl);
|
||||||
|
|
||||||
// Get RPC server for server side implementation
|
// Get RPC server for server side implementation
|
||||||
server = RPC.getServer(TestRpcService.class, service, ADDRESS, PORT, conf);
|
server = new RPC.Builder(conf).setProtocol(TestRpcService.class)
|
||||||
|
.setInstance(service).setBindAddress(ADDRESS).setPort(PORT).build();
|
||||||
addr = NetUtils.getConnectAddress(server);
|
addr = NetUtils.getConnectAddress(server);
|
||||||
|
|
||||||
// now the second protocol
|
// now the second protocol
|
||||||
|
|
|
@ -314,8 +314,9 @@ public class TestRPC {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testConfRpc() throws Exception {
|
public void testConfRpc() throws Exception {
|
||||||
Server server = RPC.getServer(TestProtocol.class,
|
Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class)
|
||||||
new TestImpl(), ADDRESS, 0, 1, false, conf, null);
|
.setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0)
|
||||||
|
.setNumHandlers(1).setVerbose(false).build();
|
||||||
// Just one handler
|
// Just one handler
|
||||||
int confQ = conf.getInt(
|
int confQ = conf.getInt(
|
||||||
CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_KEY,
|
CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_KEY,
|
||||||
|
@ -328,8 +329,11 @@ public class TestRPC {
|
||||||
assertEquals(confReaders, server.getNumReaders());
|
assertEquals(confReaders, server.getNumReaders());
|
||||||
server.stop();
|
server.stop();
|
||||||
|
|
||||||
server = RPC.getServer(TestProtocol.class,
|
server = new RPC.Builder(conf).setProtocol(TestProtocol.class)
|
||||||
new TestImpl(), ADDRESS, 0, 1, 3, 200, false, conf, null);
|
.setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0)
|
||||||
|
.setNumHandlers(1).setnumReaders(3).setQueueSizePerHandler(200)
|
||||||
|
.setVerbose(false).build();
|
||||||
|
|
||||||
assertEquals(3, server.getNumReaders());
|
assertEquals(3, server.getNumReaders());
|
||||||
assertEquals(200, server.getMaxQueueSize());
|
assertEquals(200, server.getMaxQueueSize());
|
||||||
server.stop();
|
server.stop();
|
||||||
|
@ -337,8 +341,8 @@ public class TestRPC {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testProxyAddress() throws Exception {
|
public void testProxyAddress() throws Exception {
|
||||||
Server server = RPC.getServer(TestProtocol.class,
|
Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class)
|
||||||
new TestImpl(), ADDRESS, 0, conf);
|
.setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0).build();
|
||||||
TestProtocol proxy = null;
|
TestProtocol proxy = null;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
@ -362,8 +366,10 @@ public class TestRPC {
|
||||||
public void testSlowRpc() throws Exception {
|
public void testSlowRpc() throws Exception {
|
||||||
System.out.println("Testing Slow RPC");
|
System.out.println("Testing Slow RPC");
|
||||||
// create a server with two handlers
|
// create a server with two handlers
|
||||||
Server server = RPC.getServer(TestProtocol.class,
|
Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class)
|
||||||
new TestImpl(), ADDRESS, 0, 2, false, conf, null);
|
.setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0)
|
||||||
|
.setNumHandlers(2).setVerbose(false).build();
|
||||||
|
|
||||||
TestProtocol proxy = null;
|
TestProtocol proxy = null;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
@ -409,8 +415,8 @@ public class TestRPC {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void testCallsInternal(Configuration conf) throws Exception {
|
private void testCallsInternal(Configuration conf) throws Exception {
|
||||||
Server server = RPC.getServer(TestProtocol.class,
|
Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class)
|
||||||
new TestImpl(), ADDRESS, 0, conf);
|
.setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0).build();
|
||||||
TestProtocol proxy = null;
|
TestProtocol proxy = null;
|
||||||
try {
|
try {
|
||||||
server.start();
|
server.start();
|
||||||
|
@ -528,8 +534,9 @@ public class TestRPC {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void doRPCs(Configuration conf, boolean expectFailure) throws Exception {
|
private void doRPCs(Configuration conf, boolean expectFailure) throws Exception {
|
||||||
Server server = RPC.getServer(TestProtocol.class,
|
Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class)
|
||||||
new TestImpl(), ADDRESS, 0, 5, true, conf, null);
|
.setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0)
|
||||||
|
.setNumHandlers(5).setVerbose(true).build();
|
||||||
|
|
||||||
server.refreshServiceAcl(conf, new TestPolicyProvider());
|
server.refreshServiceAcl(conf, new TestPolicyProvider());
|
||||||
|
|
||||||
|
@ -573,8 +580,9 @@ public class TestRPC {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testServerAddress() throws IOException {
|
public void testServerAddress() throws IOException {
|
||||||
Server server = RPC.getServer(TestProtocol.class,
|
Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class)
|
||||||
new TestImpl(), ADDRESS, 0, 5, true, conf, null);
|
.setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0)
|
||||||
|
.setNumHandlers(5).setVerbose(true).build();
|
||||||
InetSocketAddress bindAddr = null;
|
InetSocketAddress bindAddr = null;
|
||||||
try {
|
try {
|
||||||
bindAddr = NetUtils.getConnectAddress(server);
|
bindAddr = NetUtils.getConnectAddress(server);
|
||||||
|
@ -668,8 +676,9 @@ public class TestRPC {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testErrorMsgForInsecureClient() throws Exception {
|
public void testErrorMsgForInsecureClient() throws Exception {
|
||||||
final Server server = RPC.getServer(TestProtocol.class,
|
final Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class)
|
||||||
new TestImpl(), ADDRESS, 0, 5, true, conf, null);
|
.setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0)
|
||||||
|
.setNumHandlers(5).setVerbose(true).build();
|
||||||
server.enableSecurity();
|
server.enableSecurity();
|
||||||
server.start();
|
server.start();
|
||||||
boolean succeeded = false;
|
boolean succeeded = false;
|
||||||
|
@ -693,8 +702,10 @@ public class TestRPC {
|
||||||
|
|
||||||
conf.setInt(CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_KEY, 2);
|
conf.setInt(CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_KEY, 2);
|
||||||
|
|
||||||
final Server multiServer = RPC.getServer(TestProtocol.class,
|
final Server multiServer = new RPC.Builder(conf)
|
||||||
new TestImpl(), ADDRESS, 0, 5, true, conf, null);
|
.setProtocol(TestProtocol.class).setInstance(new TestImpl())
|
||||||
|
.setBindAddress(ADDRESS).setPort(0).setNumHandlers(5).setVerbose(true)
|
||||||
|
.build();
|
||||||
multiServer.enableSecurity();
|
multiServer.enableSecurity();
|
||||||
multiServer.start();
|
multiServer.start();
|
||||||
succeeded = false;
|
succeeded = false;
|
||||||
|
@ -748,8 +759,9 @@ public class TestRPC {
|
||||||
assertEquals("Expect no Reader threads running before test",
|
assertEquals("Expect no Reader threads running before test",
|
||||||
0, threadsBefore);
|
0, threadsBefore);
|
||||||
|
|
||||||
final Server server = RPC.getServer(TestProtocol.class,
|
final Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class)
|
||||||
new TestImpl(), ADDRESS, 0, 5, true, conf, null);
|
.setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0)
|
||||||
|
.setNumHandlers(5).setVerbose(true).build();
|
||||||
server.start();
|
server.start();
|
||||||
try {
|
try {
|
||||||
int threadsRunning = countThreads("Server$Listener$Reader");
|
int threadsRunning = countThreads("Server$Listener$Reader");
|
||||||
|
@ -762,6 +774,42 @@ public class TestRPC {
|
||||||
0, threadsAfter);
|
0, threadsAfter);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRPCBuilder() throws Exception {
|
||||||
|
// Test mandatory field conf
|
||||||
|
try {
|
||||||
|
new RPC.Builder(null).setProtocol(TestProtocol.class)
|
||||||
|
.setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0)
|
||||||
|
.setNumHandlers(5).setVerbose(true).build();
|
||||||
|
fail("Didn't throw HadoopIllegalArgumentException");
|
||||||
|
} catch (Exception e) {
|
||||||
|
if (!(e instanceof HadoopIllegalArgumentException)) {
|
||||||
|
fail("Expecting HadoopIllegalArgumentException but caught " + e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Test mandatory field protocol
|
||||||
|
try {
|
||||||
|
new RPC.Builder(conf).setInstance(new TestImpl()).setBindAddress(ADDRESS)
|
||||||
|
.setPort(0).setNumHandlers(5).setVerbose(true).build();
|
||||||
|
fail("Didn't throw HadoopIllegalArgumentException");
|
||||||
|
} catch (Exception e) {
|
||||||
|
if (!(e instanceof HadoopIllegalArgumentException)) {
|
||||||
|
fail("Expecting HadoopIllegalArgumentException but caught " + e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Test mandatory field instance
|
||||||
|
try {
|
||||||
|
new RPC.Builder(conf).setProtocol(TestProtocol.class)
|
||||||
|
.setBindAddress(ADDRESS).setPort(0).setNumHandlers(5)
|
||||||
|
.setVerbose(true).build();
|
||||||
|
fail("Didn't throw HadoopIllegalArgumentException");
|
||||||
|
} catch (Exception e) {
|
||||||
|
if (!(e instanceof HadoopIllegalArgumentException)) {
|
||||||
|
fail("Expecting HadoopIllegalArgumentException but caught " + e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public static void main(String[] args) throws Exception {
|
public static void main(String[] args) throws Exception {
|
||||||
new TestRPC().testCallsInternal(conf);
|
new TestRPC().testCallsInternal(conf);
|
||||||
|
|
||||||
|
|
|
@ -131,8 +131,9 @@ public class TestRPCCompatibility {
|
||||||
public void testVersion0ClientVersion1Server() throws Exception {
|
public void testVersion0ClientVersion1Server() throws Exception {
|
||||||
// create a server with two handlers
|
// create a server with two handlers
|
||||||
TestImpl1 impl = new TestImpl1();
|
TestImpl1 impl = new TestImpl1();
|
||||||
server = RPC.getServer(TestProtocol1.class,
|
server = new RPC.Builder(conf).setProtocol(TestProtocol1.class)
|
||||||
impl, ADDRESS, 0, 2, false, conf, null);
|
.setInstance(impl).setBindAddress(ADDRESS).setPort(0).setNumHandlers(2)
|
||||||
|
.setVerbose(false).build();
|
||||||
server.addProtocol(RPC.RpcKind.RPC_WRITABLE, TestProtocol0.class, impl);
|
server.addProtocol(RPC.RpcKind.RPC_WRITABLE, TestProtocol0.class, impl);
|
||||||
server.start();
|
server.start();
|
||||||
addr = NetUtils.getConnectAddress(server);
|
addr = NetUtils.getConnectAddress(server);
|
||||||
|
@ -147,8 +148,9 @@ public class TestRPCCompatibility {
|
||||||
@Test // old client vs new server
|
@Test // old client vs new server
|
||||||
public void testVersion1ClientVersion0Server() throws Exception {
|
public void testVersion1ClientVersion0Server() throws Exception {
|
||||||
// create a server with two handlers
|
// create a server with two handlers
|
||||||
server = RPC.getServer(TestProtocol0.class,
|
server = new RPC.Builder(conf).setProtocol(TestProtocol0.class)
|
||||||
new TestImpl0(), ADDRESS, 0, 2, false, conf, null);
|
.setInstance(new TestImpl0()).setBindAddress(ADDRESS).setPort(0)
|
||||||
|
.setNumHandlers(2).setVerbose(false).build();
|
||||||
server.start();
|
server.start();
|
||||||
addr = NetUtils.getConnectAddress(server);
|
addr = NetUtils.getConnectAddress(server);
|
||||||
|
|
||||||
|
@ -198,8 +200,9 @@ System.out.println("echo int is NOT supported");
|
||||||
public void testVersion2ClientVersion1Server() throws Exception {
|
public void testVersion2ClientVersion1Server() throws Exception {
|
||||||
// create a server with two handlers
|
// create a server with two handlers
|
||||||
TestImpl1 impl = new TestImpl1();
|
TestImpl1 impl = new TestImpl1();
|
||||||
server = RPC.getServer(TestProtocol1.class,
|
server = new RPC.Builder(conf).setProtocol(TestProtocol1.class)
|
||||||
impl, ADDRESS, 0, 2, false, conf, null);
|
.setInstance(impl).setBindAddress(ADDRESS).setPort(0).setNumHandlers(2)
|
||||||
|
.setVerbose(false).build();
|
||||||
server.addProtocol(RPC.RpcKind.RPC_WRITABLE, TestProtocol0.class, impl);
|
server.addProtocol(RPC.RpcKind.RPC_WRITABLE, TestProtocol0.class, impl);
|
||||||
server.start();
|
server.start();
|
||||||
addr = NetUtils.getConnectAddress(server);
|
addr = NetUtils.getConnectAddress(server);
|
||||||
|
@ -219,8 +222,9 @@ System.out.println("echo int is NOT supported");
|
||||||
ProtocolSignature.resetCache();
|
ProtocolSignature.resetCache();
|
||||||
// create a server with two handlers
|
// create a server with two handlers
|
||||||
TestImpl2 impl = new TestImpl2();
|
TestImpl2 impl = new TestImpl2();
|
||||||
server = RPC.getServer(TestProtocol2.class,
|
server = new RPC.Builder(conf).setProtocol(TestProtocol2.class)
|
||||||
impl, ADDRESS, 0, 2, false, conf, null);
|
.setInstance(impl).setBindAddress(ADDRESS).setPort(0).setNumHandlers(2)
|
||||||
|
.setVerbose(false).build();
|
||||||
server.addProtocol(RPC.RpcKind.RPC_WRITABLE, TestProtocol0.class, impl);
|
server.addProtocol(RPC.RpcKind.RPC_WRITABLE, TestProtocol0.class, impl);
|
||||||
server.start();
|
server.start();
|
||||||
addr = NetUtils.getConnectAddress(server);
|
addr = NetUtils.getConnectAddress(server);
|
||||||
|
@ -290,8 +294,9 @@ System.out.println("echo int is NOT supported");
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testVersionMismatch() throws IOException {
|
public void testVersionMismatch() throws IOException {
|
||||||
server = RPC.getServer(TestProtocol2.class, new TestImpl2(), ADDRESS, 0, 2,
|
server = new RPC.Builder(conf).setProtocol(TestProtocol2.class)
|
||||||
false, conf, null);
|
.setInstance(new TestImpl2()).setBindAddress(ADDRESS).setPort(0)
|
||||||
|
.setNumHandlers(2).setVerbose(false).build();
|
||||||
server.start();
|
server.start();
|
||||||
addr = NetUtils.getConnectAddress(server);
|
addr = NetUtils.getConnectAddress(server);
|
||||||
|
|
||||||
|
@ -308,8 +313,9 @@ System.out.println("echo int is NOT supported");
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testIsMethodSupported() throws IOException {
|
public void testIsMethodSupported() throws IOException {
|
||||||
server = RPC.getServer(TestProtocol2.class, new TestImpl2(), ADDRESS, 0, 2,
|
server = new RPC.Builder(conf).setProtocol(TestProtocol2.class)
|
||||||
false, conf, null);
|
.setInstance(new TestImpl2()).setBindAddress(ADDRESS).setPort(0)
|
||||||
|
.setNumHandlers(2).setVerbose(false).build();
|
||||||
server.start();
|
server.start();
|
||||||
addr = NetUtils.getConnectAddress(server);
|
addr = NetUtils.getConnectAddress(server);
|
||||||
|
|
||||||
|
@ -332,8 +338,9 @@ System.out.println("echo int is NOT supported");
|
||||||
@Test
|
@Test
|
||||||
public void testProtocolMetaInfoSSTranslatorPB() throws Exception {
|
public void testProtocolMetaInfoSSTranslatorPB() throws Exception {
|
||||||
TestImpl1 impl = new TestImpl1();
|
TestImpl1 impl = new TestImpl1();
|
||||||
server = RPC.getServer(TestProtocol1.class, impl, ADDRESS, 0, 2, false,
|
server = new RPC.Builder(conf).setProtocol(TestProtocol1.class)
|
||||||
conf, null);
|
.setInstance(impl).setBindAddress(ADDRESS).setPort(0).setNumHandlers(2)
|
||||||
|
.setVerbose(false).build();
|
||||||
server.addProtocol(RPC.RpcKind.RPC_WRITABLE, TestProtocol0.class, impl);
|
server.addProtocol(RPC.RpcKind.RPC_WRITABLE, TestProtocol0.class, impl);
|
||||||
server.start();
|
server.start();
|
||||||
|
|
||||||
|
|
|
@ -235,9 +235,11 @@ public class TestSaslRPC {
|
||||||
@Test
|
@Test
|
||||||
public void testDigestRpc() throws Exception {
|
public void testDigestRpc() throws Exception {
|
||||||
TestTokenSecretManager sm = new TestTokenSecretManager();
|
TestTokenSecretManager sm = new TestTokenSecretManager();
|
||||||
final Server server = RPC.getServer(TestSaslProtocol.class,
|
final Server server = new RPC.Builder(conf)
|
||||||
new TestSaslImpl(), ADDRESS, 0, 5, true, conf, sm);
|
.setProtocol(TestSaslProtocol.class).setInstance(new TestSaslImpl())
|
||||||
|
.setBindAddress(ADDRESS).setPort(0).setNumHandlers(5).setVerbose(true)
|
||||||
|
.setSecretManager(sm).build();
|
||||||
|
|
||||||
doDigestRpc(server, sm);
|
doDigestRpc(server, sm);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -246,9 +248,10 @@ public class TestSaslRPC {
|
||||||
TestTokenSecretManager sm = new TestTokenSecretManager();
|
TestTokenSecretManager sm = new TestTokenSecretManager();
|
||||||
try {
|
try {
|
||||||
SecurityUtil.setSecurityInfoProviders(new CustomSecurityInfo());
|
SecurityUtil.setSecurityInfoProviders(new CustomSecurityInfo());
|
||||||
final Server server = RPC.getServer(TestSaslProtocol.class,
|
final Server server = new RPC.Builder(conf)
|
||||||
new TestSaslImpl(), ADDRESS, 0, 5,
|
.setProtocol(TestSaslProtocol.class).setInstance(new TestSaslImpl())
|
||||||
true, conf, sm);
|
.setBindAddress(ADDRESS).setPort(0).setNumHandlers(5)
|
||||||
|
.setVerbose(true).setSecretManager(sm).build();
|
||||||
doDigestRpc(server, sm);
|
doDigestRpc(server, sm);
|
||||||
} finally {
|
} finally {
|
||||||
SecurityUtil.setSecurityInfoProviders(new SecurityInfo[0]);
|
SecurityUtil.setSecurityInfoProviders(new SecurityInfo[0]);
|
||||||
|
@ -257,8 +260,9 @@ public class TestSaslRPC {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testSecureToInsecureRpc() throws Exception {
|
public void testSecureToInsecureRpc() throws Exception {
|
||||||
Server server = RPC.getServer(TestSaslProtocol.class,
|
Server server = new RPC.Builder(conf).setProtocol(TestSaslProtocol.class)
|
||||||
new TestSaslImpl(), ADDRESS, 0, 5, true, conf, null);
|
.setInstance(new TestSaslImpl()).setBindAddress(ADDRESS).setPort(0)
|
||||||
|
.setNumHandlers(5).setVerbose(true).build();
|
||||||
server.disableSecurity();
|
server.disableSecurity();
|
||||||
TestTokenSecretManager sm = new TestTokenSecretManager();
|
TestTokenSecretManager sm = new TestTokenSecretManager();
|
||||||
doDigestRpc(server, sm);
|
doDigestRpc(server, sm);
|
||||||
|
@ -267,8 +271,10 @@ public class TestSaslRPC {
|
||||||
@Test
|
@Test
|
||||||
public void testErrorMessage() throws Exception {
|
public void testErrorMessage() throws Exception {
|
||||||
BadTokenSecretManager sm = new BadTokenSecretManager();
|
BadTokenSecretManager sm = new BadTokenSecretManager();
|
||||||
final Server server = RPC.getServer(TestSaslProtocol.class,
|
final Server server = new RPC.Builder(conf)
|
||||||
new TestSaslImpl(), ADDRESS, 0, 5, true, conf, sm);
|
.setProtocol(TestSaslProtocol.class).setInstance(new TestSaslImpl())
|
||||||
|
.setBindAddress(ADDRESS).setPort(0).setNumHandlers(5).setVerbose(true)
|
||||||
|
.setSecretManager(sm).build();
|
||||||
|
|
||||||
boolean succeeded = false;
|
boolean succeeded = false;
|
||||||
try {
|
try {
|
||||||
|
@ -355,8 +361,10 @@ public class TestSaslRPC {
|
||||||
@Test
|
@Test
|
||||||
public void testPerConnectionConf() throws Exception {
|
public void testPerConnectionConf() throws Exception {
|
||||||
TestTokenSecretManager sm = new TestTokenSecretManager();
|
TestTokenSecretManager sm = new TestTokenSecretManager();
|
||||||
final Server server = RPC.getServer(TestSaslProtocol.class,
|
final Server server = new RPC.Builder(conf)
|
||||||
new TestSaslImpl(), ADDRESS, 0, 5, true, conf, sm);
|
.setProtocol(TestSaslProtocol.class).setInstance(new TestSaslImpl())
|
||||||
|
.setBindAddress(ADDRESS).setPort(0).setNumHandlers(5).setVerbose(true)
|
||||||
|
.setSecretManager(sm).build();
|
||||||
server.start();
|
server.start();
|
||||||
final UserGroupInformation current = UserGroupInformation.getCurrentUser();
|
final UserGroupInformation current = UserGroupInformation.getCurrentUser();
|
||||||
final InetSocketAddress addr = NetUtils.getConnectAddress(server);
|
final InetSocketAddress addr = NetUtils.getConnectAddress(server);
|
||||||
|
@ -418,8 +426,10 @@ public class TestSaslRPC {
|
||||||
UserGroupInformation current = UserGroupInformation.getCurrentUser();
|
UserGroupInformation current = UserGroupInformation.getCurrentUser();
|
||||||
System.out.println("UGI: " + current);
|
System.out.println("UGI: " + current);
|
||||||
|
|
||||||
Server server = RPC.getServer(TestSaslProtocol.class, new TestSaslImpl(),
|
Server server = new RPC.Builder(newConf)
|
||||||
ADDRESS, 0, 5, true, newConf, null);
|
.setProtocol(TestSaslProtocol.class).setInstance(new TestSaslImpl())
|
||||||
|
.setBindAddress(ADDRESS).setPort(0).setNumHandlers(5).setVerbose(true)
|
||||||
|
.build();
|
||||||
TestSaslProtocol proxy = null;
|
TestSaslProtocol proxy = null;
|
||||||
|
|
||||||
server.start();
|
server.start();
|
||||||
|
@ -441,8 +451,9 @@ public class TestSaslRPC {
|
||||||
@Test
|
@Test
|
||||||
public void testDigestAuthMethod() throws Exception {
|
public void testDigestAuthMethod() throws Exception {
|
||||||
TestTokenSecretManager sm = new TestTokenSecretManager();
|
TestTokenSecretManager sm = new TestTokenSecretManager();
|
||||||
Server server = RPC.getServer(TestSaslProtocol.class,
|
Server server = new RPC.Builder(conf).setProtocol(TestSaslProtocol.class)
|
||||||
new TestSaslImpl(), ADDRESS, 0, 5, true, conf, sm);
|
.setInstance(new TestSaslImpl()).setBindAddress(ADDRESS).setPort(0)
|
||||||
|
.setNumHandlers(5).setVerbose(true).setSecretManager(sm).build();
|
||||||
server.start();
|
server.start();
|
||||||
|
|
||||||
final UserGroupInformation current = UserGroupInformation.getCurrentUser();
|
final UserGroupInformation current = UserGroupInformation.getCurrentUser();
|
||||||
|
|
|
@ -155,8 +155,9 @@ public class TestDoAsEffectiveUser {
|
||||||
conf.setStrings(ProxyUsers
|
conf.setStrings(ProxyUsers
|
||||||
.getProxySuperuserGroupConfKey(REAL_USER_SHORT_NAME), "group1");
|
.getProxySuperuserGroupConfKey(REAL_USER_SHORT_NAME), "group1");
|
||||||
configureSuperUserIPAddresses(conf, REAL_USER_SHORT_NAME);
|
configureSuperUserIPAddresses(conf, REAL_USER_SHORT_NAME);
|
||||||
Server server = RPC.getServer(TestProtocol.class, new TestImpl(), ADDRESS,
|
Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class)
|
||||||
0, 5, true, conf, null);
|
.setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0)
|
||||||
|
.setNumHandlers(5).setVerbose(true).build();
|
||||||
|
|
||||||
refreshConf(conf);
|
refreshConf(conf);
|
||||||
try {
|
try {
|
||||||
|
@ -197,8 +198,9 @@ public class TestDoAsEffectiveUser {
|
||||||
configureSuperUserIPAddresses(conf, REAL_USER_SHORT_NAME);
|
configureSuperUserIPAddresses(conf, REAL_USER_SHORT_NAME);
|
||||||
conf.setStrings(ProxyUsers.getProxySuperuserGroupConfKey(REAL_USER_SHORT_NAME),
|
conf.setStrings(ProxyUsers.getProxySuperuserGroupConfKey(REAL_USER_SHORT_NAME),
|
||||||
"group1");
|
"group1");
|
||||||
Server server = RPC.getServer(TestProtocol.class, new TestImpl(), ADDRESS,
|
Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class)
|
||||||
0, 2, false, conf, null);
|
.setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0)
|
||||||
|
.setNumHandlers(2).setVerbose(false).build();
|
||||||
|
|
||||||
refreshConf(conf);
|
refreshConf(conf);
|
||||||
try {
|
try {
|
||||||
|
@ -244,8 +246,9 @@ public class TestDoAsEffectiveUser {
|
||||||
"20.20.20.20"); //Authorized IP address
|
"20.20.20.20"); //Authorized IP address
|
||||||
conf.setStrings(ProxyUsers.getProxySuperuserGroupConfKey(REAL_USER_SHORT_NAME),
|
conf.setStrings(ProxyUsers.getProxySuperuserGroupConfKey(REAL_USER_SHORT_NAME),
|
||||||
"group1");
|
"group1");
|
||||||
Server server = RPC.getServer(TestProtocol.class, new TestImpl(), ADDRESS,
|
Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class)
|
||||||
0, 2, false, conf, null);
|
.setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0)
|
||||||
|
.setNumHandlers(2).setVerbose(false).build();
|
||||||
|
|
||||||
refreshConf(conf);
|
refreshConf(conf);
|
||||||
|
|
||||||
|
@ -286,8 +289,9 @@ public class TestDoAsEffectiveUser {
|
||||||
final Configuration conf = new Configuration();
|
final Configuration conf = new Configuration();
|
||||||
conf.setStrings(ProxyUsers
|
conf.setStrings(ProxyUsers
|
||||||
.getProxySuperuserGroupConfKey(REAL_USER_SHORT_NAME), "group1");
|
.getProxySuperuserGroupConfKey(REAL_USER_SHORT_NAME), "group1");
|
||||||
Server server = RPC.getServer(TestProtocol.class, new TestImpl(), ADDRESS,
|
Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class)
|
||||||
0, 2, false, conf, null);
|
.setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0)
|
||||||
|
.setNumHandlers(2).setVerbose(false).build();
|
||||||
|
|
||||||
try {
|
try {
|
||||||
server.start();
|
server.start();
|
||||||
|
@ -325,8 +329,9 @@ public class TestDoAsEffectiveUser {
|
||||||
public void testRealUserGroupNotSpecified() throws IOException {
|
public void testRealUserGroupNotSpecified() throws IOException {
|
||||||
final Configuration conf = new Configuration();
|
final Configuration conf = new Configuration();
|
||||||
configureSuperUserIPAddresses(conf, REAL_USER_SHORT_NAME);
|
configureSuperUserIPAddresses(conf, REAL_USER_SHORT_NAME);
|
||||||
Server server = RPC.getServer(TestProtocol.class, new TestImpl(), ADDRESS,
|
Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class)
|
||||||
0, 2, false, conf, null);
|
.setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0)
|
||||||
|
.setNumHandlers(2).setVerbose(false).build();
|
||||||
|
|
||||||
try {
|
try {
|
||||||
server.start();
|
server.start();
|
||||||
|
@ -366,9 +371,9 @@ public class TestDoAsEffectiveUser {
|
||||||
configureSuperUserIPAddresses(conf, REAL_USER_SHORT_NAME);
|
configureSuperUserIPAddresses(conf, REAL_USER_SHORT_NAME);
|
||||||
conf.setStrings(ProxyUsers.getProxySuperuserGroupConfKey(REAL_USER_SHORT_NAME),
|
conf.setStrings(ProxyUsers.getProxySuperuserGroupConfKey(REAL_USER_SHORT_NAME),
|
||||||
"group3");
|
"group3");
|
||||||
Server server = RPC.getServer(TestProtocol.class, new TestImpl(), ADDRESS,
|
Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class)
|
||||||
0, 2, false, conf, null);
|
.setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0)
|
||||||
|
.setNumHandlers(2).setVerbose(false).build();
|
||||||
|
|
||||||
try {
|
try {
|
||||||
server.start();
|
server.start();
|
||||||
|
@ -414,8 +419,9 @@ public class TestDoAsEffectiveUser {
|
||||||
conf
|
conf
|
||||||
.set(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
|
.set(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
|
||||||
UserGroupInformation.setConfiguration(conf);
|
UserGroupInformation.setConfiguration(conf);
|
||||||
final Server server = RPC.getServer(TestProtocol.class, new TestImpl(),
|
final Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class)
|
||||||
ADDRESS, 0, 5, true, conf, sm);
|
.setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0)
|
||||||
|
.setNumHandlers(5).setVerbose(true).setSecretManager(sm).build();
|
||||||
|
|
||||||
server.start();
|
server.start();
|
||||||
|
|
||||||
|
@ -468,8 +474,10 @@ public class TestDoAsEffectiveUser {
|
||||||
newConf.set(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION,
|
newConf.set(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION,
|
||||||
"kerberos");
|
"kerberos");
|
||||||
UserGroupInformation.setConfiguration(newConf);
|
UserGroupInformation.setConfiguration(newConf);
|
||||||
final Server server = RPC.getServer(TestProtocol.class, new TestImpl(),
|
final Server server = new RPC.Builder(newConf)
|
||||||
ADDRESS, 0, 5, true, newConf, sm);
|
.setProtocol(TestProtocol.class).setInstance(new TestImpl())
|
||||||
|
.setBindAddress(ADDRESS).setPort(0).setNumHandlers(5).setVerbose(true)
|
||||||
|
.setSecretManager(sm).build();
|
||||||
|
|
||||||
server.start();
|
server.start();
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue