svn merge -c 1507259 from trunk for HADOOP-9756. Remove the deprecated getServer(..) methods from RPC.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1507586 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2013-07-27 07:01:51 +00:00
parent 4c045ebafd
commit 3c08e71316
11 changed files with 76 additions and 159 deletions

View File

@ -226,6 +226,9 @@ Release 2.1.0-beta - 2013-07-02
HADOOP-9760. Move GSet and related classes to common from HDFS.
(suresh)
HADOOP-9756. Remove the deprecated getServer(..) methods from RPC.
(Junping Du via szetszwo)
HADOOP-9770. Make RetryCache#state non volatile. (suresh)
OPTIMIZATIONS

View File

@ -2220,7 +2220,7 @@ public class Configuration implements Iterable<Map.Entry<String,String>>,
doc.appendChild(conf);
conf.appendChild(doc.createTextNode("\n"));
handleDeprecation(); //ensure properties is set and deprecation is handled
for (Enumeration e = properties.keys(); e.hasMoreElements();) {
for (Enumeration<Object> e = properties.keys(); e.hasMoreElements();) {
String name = (String)e.nextElement();
Object object = properties.get(name);
String value = null;

View File

@ -642,104 +642,6 @@ public class RPC {
+ proxy.getClass());
}
/** Construct a server for a protocol implementation instance listening on a
* port and address.
* @deprecated Please use {@link Builder} to build the {@link Server}
*/
@Deprecated
public static Server getServer(final Object instance, final String bindAddress, final int port, Configuration conf)
throws IOException {
return getServer(instance, bindAddress, port, 1, false, conf);
}
/** Construct a server for a protocol implementation instance listening on a
* port and address.
* @deprecated Please use {@link Builder} to build the {@link Server}
*/
@Deprecated
public static Server getServer(final Object instance, final String bindAddress, final int port,
final int numHandlers,
final boolean verbose, Configuration conf)
throws IOException {
return getServer(instance.getClass(), // use impl class for protocol
instance, bindAddress, port, numHandlers, false, conf, null,
null);
}
/** Construct a server for a protocol implementation instance.
* @deprecated Please use {@link Builder} to build the {@link Server}
*/
@Deprecated
public static Server getServer(Class<?> protocol,
Object instance, String bindAddress,
int port, Configuration conf)
throws IOException {
return getServer(protocol, instance, bindAddress, port, 1, false, conf, null,
null);
}
/** Construct a server for a protocol implementation instance.
* @deprecated Please use {@link Builder} to build the {@link Server}
*/
@Deprecated
public static Server getServer(Class<?> protocol,
Object instance, String bindAddress, int port,
int numHandlers,
boolean verbose, Configuration conf)
throws IOException {
return getServer(protocol, instance, bindAddress, port, numHandlers, verbose,
conf, null, null);
}
/** Construct a server for a protocol implementation instance.
* @deprecated Please use {@link Builder} to build the {@link Server}
*/
@Deprecated
public static Server getServer(Class<?> protocol,
Object instance, String bindAddress, int port,
int numHandlers,
boolean verbose, Configuration conf,
SecretManager<? extends TokenIdentifier> secretManager)
throws IOException {
return getServer(protocol, instance, bindAddress, port, numHandlers, verbose,
conf, secretManager, null);
}
/**
* @deprecated Please use {@link Builder} to build the {@link Server}
*/
@Deprecated
public static Server getServer(Class<?> protocol,
Object instance, String bindAddress, int port,
int numHandlers,
boolean verbose, Configuration conf,
SecretManager<? extends TokenIdentifier> secretManager,
String portRangeConfig)
throws IOException {
return getProtocolEngine(protocol, conf)
.getServer(protocol, instance, bindAddress, port, numHandlers, -1, -1,
verbose, conf, secretManager, portRangeConfig);
}
/** Construct a server for a protocol implementation instance.
* @deprecated Please use {@link Builder} to build the {@link Server}
*/
@Deprecated
public static <PROTO extends VersionedProtocol, IMPL extends PROTO>
Server getServer(Class<PROTO> protocol,
IMPL instance, String bindAddress, int port,
int numHandlers, int numReaders, int queueSizePerHandler,
boolean verbose, Configuration conf,
SecretManager<? extends TokenIdentifier> secretManager)
throws IOException {
return getProtocolEngine(protocol, conf)
.getServer(protocol, instance, bindAddress, port, numHandlers,
numReaders, queueSizePerHandler, verbose, conf, secretManager,
null);
}
/**
* Class to construct instances of RPC server with specific options.
*/

View File

@ -217,14 +217,14 @@ public class TestIPC {
}
@Test
public void testSerial() throws Exception {
public void testSerial() throws IOException, InterruptedException {
testSerial(3, false, 2, 5, 100);
testSerial(3, true, 2, 5, 10);
}
public void testSerial(int handlerCount, boolean handlerSleep,
int clientCount, int callerCount, int callCount)
throws Exception {
throws IOException, InterruptedException {
Server server = new TestServer(handlerCount, handlerSleep);
InetSocketAddress addr = NetUtils.getConnectAddress(server);
server.start();
@ -250,7 +250,7 @@ public class TestIPC {
}
@Test
public void testStandAloneClient() throws Exception {
public void testStandAloneClient() throws IOException {
Client client = new Client(LongWritable.class, conf);
InetSocketAddress address = new InetSocketAddress("127.0.0.1", 10);
try {
@ -350,7 +350,8 @@ public class TestIPC {
Class<? extends LongWritable> clientParamClass,
Class<? extends LongWritable> serverParamClass,
Class<? extends LongWritable> serverResponseClass,
Class<? extends LongWritable> clientResponseClass) throws Exception {
Class<? extends LongWritable> clientResponseClass)
throws IOException, InstantiationException, IllegalAccessException {
// start server
Server server = new TestServer(1, false,
@ -481,7 +482,7 @@ public class TestIPC {
* to the client.
*/
@Test
public void testSocketFactoryException() throws Exception {
public void testSocketFactoryException() throws IOException {
SocketFactory mockFactory = mock(SocketFactory.class);
doThrow(new IOException("Injected fault")).when(mockFactory).createSocket();
Client client = new Client(LongWritable.class, conf, mockFactory);
@ -503,7 +504,7 @@ public class TestIPC {
* HADOOP-7428.
*/
@Test
public void testRTEDuringConnectionSetup() throws Exception {
public void testRTEDuringConnectionSetup() throws IOException {
// Set up a socket factory which returns sockets which
// throw an RTE when setSoTimeout is called.
SocketFactory spyFactory = spy(NetUtils.getDefaultSocketFactory(conf));
@ -544,7 +545,7 @@ public class TestIPC {
}
@Test
public void testIpcTimeout() throws Exception {
public void testIpcTimeout() throws IOException {
// start server
Server server = new TestServer(1, true);
InetSocketAddress addr = NetUtils.getConnectAddress(server);
@ -566,7 +567,7 @@ public class TestIPC {
}
@Test
public void testIpcConnectTimeout() throws Exception {
public void testIpcConnectTimeout() throws IOException {
// start server
Server server = new TestServer(1, true);
InetSocketAddress addr = NetUtils.getConnectAddress(server);
@ -589,7 +590,7 @@ public class TestIPC {
* Check service class byte in IPC header is correct on wire.
*/
@Test(timeout=60000)
public void testIpcWithServiceClass() throws Exception {
public void testIpcWithServiceClass() throws IOException {
// start server
Server server = new TestServer(5, false);
InetSocketAddress addr = NetUtils.getConnectAddress(server);
@ -616,7 +617,7 @@ public class TestIPC {
* Make a call from a client and verify if header info is changed in server side
*/
private void callAndVerify(Server server, InetSocketAddress addr,
int serviceClass, boolean noChanged) throws Exception{
int serviceClass, boolean noChanged) throws IOException{
Client client = new Client(LongWritable.class, conf);
client.call(new LongWritable(RANDOM.nextLong()),
@ -650,7 +651,7 @@ public class TestIPC {
* and stopping IPC servers.
*/
@Test(timeout=60000)
public void testSocketLeak() throws Exception {
public void testSocketLeak() throws IOException {
Assume.assumeTrue(FD_DIR.exists()); // only run on Linux
long startFds = countOpenFileDescriptors();
@ -670,31 +671,31 @@ public class TestIPC {
}
@Test
public void testIpcFromHadoop_0_18_13() throws Exception {
public void testIpcFromHadoop_0_18_13() throws IOException {
doIpcVersionTest(NetworkTraces.HADOOP_0_18_3_RPC_DUMP,
NetworkTraces.RESPONSE_TO_HADOOP_0_18_3_RPC);
}
@Test
public void testIpcFromHadoop0_20_3() throws Exception {
public void testIpcFromHadoop0_20_3() throws IOException {
doIpcVersionTest(NetworkTraces.HADOOP_0_20_3_RPC_DUMP,
NetworkTraces.RESPONSE_TO_HADOOP_0_20_3_RPC);
}
@Test
public void testIpcFromHadoop0_21_0() throws Exception {
public void testIpcFromHadoop0_21_0() throws IOException {
doIpcVersionTest(NetworkTraces.HADOOP_0_21_0_RPC_DUMP,
NetworkTraces.RESPONSE_TO_HADOOP_0_21_0_RPC);
}
@Test
public void testHttpGetResponse() throws Exception {
public void testHttpGetResponse() throws IOException {
doIpcVersionTest("GET / HTTP/1.0\r\n\r\n".getBytes(),
Server.RECEIVED_HTTP_REQ_RESPONSE.getBytes());
}
@Test
public void testConnectionRetriesOnSocketTimeoutExceptions() throws Exception {
public void testConnectionRetriesOnSocketTimeoutExceptions() throws IOException {
Configuration conf = new Configuration();
// set max retries to 0
conf.setInt(
@ -720,7 +721,7 @@ public class TestIPC {
* (2) the rpc client receives the same call id/retry from the rpc server.
*/
@Test
public void testCallIdAndRetry() throws Exception {
public void testCallIdAndRetry() throws IOException {
final CallInfo info = new CallInfo();
// Override client to store the call info and check response
@ -772,7 +773,7 @@ public class TestIPC {
* Test the retry count while used in a retry proxy.
*/
@Test
public void testRetryProxy() throws Exception {
public void testRetryProxy() throws IOException {
final Client client = new Client(LongWritable.class, conf);
final TestServer server = new TestServer(1, false);
@ -807,7 +808,7 @@ public class TestIPC {
* Test if the rpc server gets the default retry count (0) from client.
*/
@Test
public void testInitialCallRetryCount() throws Exception {
public void testInitialCallRetryCount() throws IOException {
// Override client to store the call id
final Client client = new Client(LongWritable.class, conf);
@ -838,7 +839,7 @@ public class TestIPC {
* Test if the rpc server gets the retry count from client.
*/
@Test
public void testCallRetryCount() throws Exception {
public void testCallRetryCount() throws IOException {
final int retryCount = 255;
// Override client to store the call id
final Client client = new Client(LongWritable.class, conf);
@ -870,9 +871,11 @@ public class TestIPC {
/**
* Tests that client generates a unique sequential call ID for each RPC call,
* even if multiple threads are using the same client.
* @throws InterruptedException
*/
@Test
public void testUniqueSequentialCallIds() throws Exception {
public void testUniqueSequentialCallIds()
throws IOException, InterruptedException {
int serverThreads = 10, callerCount = 100, perCallerCallCount = 100;
TestServer server = new TestServer(serverThreads, false);
@ -937,7 +940,7 @@ public class TestIPC {
private void doIpcVersionTest(
byte[] requestData,
byte[] expectedResponse) throws Exception {
byte[] expectedResponse) throws IOException {
Server server = new TestServer(1, true);
InetSocketAddress addr = NetUtils.getConnectAddress(server);
server.start();

View File

@ -115,7 +115,8 @@ public class TestIPCServerResponder extends TestCase {
}
}
public void testResponseBuffer() throws Exception {
public void testResponseBuffer()
throws IOException, InterruptedException {
Server.INITIAL_RESP_BUF_SIZE = 1;
conf.setInt(CommonConfigurationKeys.IPC_SERVER_RPC_MAX_RESPONSE_SIZE_KEY,
1);
@ -123,7 +124,8 @@ public class TestIPCServerResponder extends TestCase {
conf = new Configuration(); // reset configuration
}
public void testServerResponder() throws Exception {
public void testServerResponder()
throws IOException, InterruptedException {
testServerResponder(10, true, 1, 10, 200);
}
@ -131,7 +133,8 @@ public class TestIPCServerResponder extends TestCase {
final boolean handlerSleep,
final int clientCount,
final int callerCount,
final int callCount) throws Exception {
final int callCount) throws IOException,
InterruptedException {
Server server = new TestServer(handlerCount, handlerSleep);
server.start();

View File

@ -323,7 +323,7 @@ public class TestRPC {
}
@Test
public void testConfRpc() throws Exception {
public void testConfRpc() throws IOException {
Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class)
.setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0)
.setNumHandlers(1).setVerbose(false).build();
@ -350,7 +350,7 @@ public class TestRPC {
}
@Test
public void testProxyAddress() throws Exception {
public void testProxyAddress() throws IOException {
Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class)
.setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0).build();
TestProtocol proxy = null;
@ -372,7 +372,7 @@ public class TestRPC {
}
@Test
public void testSlowRpc() throws Exception {
public void testSlowRpc() throws IOException {
System.out.println("Testing Slow RPC");
// create a server with two handlers
Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class)
@ -418,11 +418,11 @@ public class TestRPC {
}
@Test
public void testCalls() throws Exception {
public void testCalls() throws IOException {
testCallsInternal(conf);
}
private void testCallsInternal(Configuration conf) throws Exception {
private void testCallsInternal(Configuration conf) throws IOException {
Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class)
.setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0).build();
TestProtocol proxy = null;
@ -540,7 +540,7 @@ public class TestRPC {
}
private void doRPCs(Configuration conf, boolean expectFailure) throws Exception {
private void doRPCs(Configuration conf, boolean expectFailure) throws IOException {
Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class)
.setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0)
.setNumHandlers(5).setVerbose(true).build();
@ -599,7 +599,7 @@ public class TestRPC {
}
@Test
public void testAuthorization() throws Exception {
public void testAuthorization() throws IOException {
Configuration conf = new Configuration();
conf.setBoolean(CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION,
true);
@ -626,7 +626,7 @@ public class TestRPC {
* Switch off setting socketTimeout values on RPC sockets.
* Verify that RPC calls still work ok.
*/
public void testNoPings() throws Exception {
public void testNoPings() throws IOException {
Configuration conf = new Configuration();
conf.setBoolean("ipc.client.ping", false);
@ -638,10 +638,10 @@ public class TestRPC {
/**
* Test stopping a non-registered proxy
* @throws Exception
* @throws IOException
*/
@Test(expected=HadoopIllegalArgumentException.class)
public void testStopNonRegisteredProxy() throws Exception {
public void testStopNonRegisteredProxy() throws IOException {
RPC.stopProxy(null);
}
@ -650,7 +650,7 @@ public class TestRPC {
* be stopped without error.
*/
@Test
public void testStopMockObject() throws Exception {
public void testStopMockObject() throws IOException {
RPC.stopProxy(MockitoUtil.mockProtocol(TestProtocol.class));
}
@ -681,7 +681,7 @@ public class TestRPC {
}
@Test
public void testErrorMsgForInsecureClient() throws Exception {
public void testErrorMsgForInsecureClient() throws IOException {
Configuration serverConf = new Configuration(conf);
SecurityUtil.setAuthenticationMethod(AuthenticationMethod.KERBEROS,
serverConf);
@ -766,7 +766,7 @@ public class TestRPC {
* Test that server.stop() properly stops all threads
*/
@Test
public void testStopsAllThreads() throws Exception {
public void testStopsAllThreads() throws IOException, InterruptedException {
int threadsBefore = countThreads("Server$Listener$Reader");
assertEquals("Expect no Reader threads running before test",
0, threadsBefore);
@ -797,7 +797,7 @@ public class TestRPC {
}
@Test
public void testRPCBuilder() throws Exception {
public void testRPCBuilder() throws IOException {
// Test mandatory field conf
try {
new RPC.Builder(null).setProtocol(TestProtocol.class)
@ -833,11 +833,13 @@ public class TestRPC {
}
@Test(timeout=90000)
public void testRPCInterruptedSimple() throws Exception {
public void testRPCInterruptedSimple() throws IOException {
final Configuration conf = new Configuration();
Server server = RPC.getServer(
TestProtocol.class, new TestImpl(), ADDRESS, 0, 5, true, conf, null
);
Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class)
.setInstance(new TestImpl()).setBindAddress(ADDRESS)
.setPort(0).setNumHandlers(5).setVerbose(true)
.setSecretManager(null).build();
server.start();
InetSocketAddress addr = NetUtils.getConnectAddress(server);
@ -862,9 +864,10 @@ public class TestRPC {
@Test(timeout=30000)
public void testRPCInterrupted() throws IOException, InterruptedException {
final Configuration conf = new Configuration();
Server server = RPC.getServer(
TestProtocol.class, new TestImpl(), ADDRESS, 0, 5, true, conf, null
);
Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class)
.setInstance(new TestImpl()).setBindAddress(ADDRESS)
.setPort(0).setNumHandlers(5).setVerbose(true)
.setSecretManager(null).build();
server.start();
@ -922,7 +925,7 @@ public class TestRPC {
assertTrue("rpc got exception " + error.get(), error.get() == null);
}
public static void main(String[] args) throws Exception {
public static void main(String[] args) throws IOException {
new TestRPC().testCallsInternal(conf);
}

View File

@ -278,7 +278,7 @@ System.out.println("echo int is NOT supported");
TestProtocol3.class.getMethod("echo_alias", int.class));
assertFalse(intEchoHash == intEchoHashAlias);
// Make sure that methods with the same returninig type and method name but
// Make sure that methods with the same returning type and method name but
// larger number of parameter types have different hash code
int intEchoHash2 = ProtocolSignature.getFingerprint(
TestProtocol3.class.getMethod("echo", int.class, int.class));

View File

@ -35,7 +35,7 @@ import org.junit.Test;
public class TestSocketFactory {
@Test
public void testSocketFactoryAsKeyInMap() throws Exception {
public void testSocketFactoryAsKeyInMap() {
Map<SocketFactory, Integer> dummyCache = new HashMap<SocketFactory, Integer>();
int toBeCached1 = 1;
int toBeCached2 = 2;

View File

@ -344,7 +344,6 @@ public class NameNodeProxies {
}
/** Creates the Failover proxy provider instance*/
@SuppressWarnings("unchecked")
private static <T> FailoverProxyProvider<T> createFailoverProxyProvider(
Configuration conf, Class<FailoverProxyProvider<T>> failoverProxyProviderClass,
Class<T> xface, URI nameNodeUri) throws IOException {
@ -354,9 +353,9 @@ public class NameNodeProxies {
try {
Constructor<FailoverProxyProvider<T>> ctor = failoverProxyProviderClass
.getConstructor(Configuration.class, URI.class, Class.class);
FailoverProxyProvider<?> provider = ctor.newInstance(conf, nameNodeUri,
FailoverProxyProvider<T> provider = ctor.newInstance(conf, nameNodeUri,
xface);
return (FailoverProxyProvider<T>) provider;
return provider;
} catch (Exception e) {
String message = "Couldn't create proxy provider " + failoverProxyProviderClass;
if (LOG.isDebugEnabled()) {

View File

@ -63,7 +63,7 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
* This class HAS to be in this package to access package private
* methods/classes.
*/
@SuppressWarnings({"unchecked" , "deprecation"})
@SuppressWarnings({"unchecked"})
public class TaskAttemptListenerImpl extends CompositeService
implements TaskUmbilicalProtocol, TaskAttemptListener {
@ -118,11 +118,14 @@ public class TaskAttemptListenerImpl extends CompositeService
protected void startRpcServer() {
Configuration conf = getConfig();
try {
server =
RPC.getServer(TaskUmbilicalProtocol.class, this, "0.0.0.0", 0,
conf.getInt(MRJobConfig.MR_AM_TASK_LISTENER_THREAD_COUNT,
MRJobConfig.DEFAULT_MR_AM_TASK_LISTENER_THREAD_COUNT),
false, conf, jobTokenSecretManager);
server =
new RPC.Builder(conf).setProtocol(TaskUmbilicalProtocol.class)
.setInstance(this).setBindAddress("0.0.0.0")
.setPort(0).setNumHandlers(
conf.getInt(MRJobConfig.MR_AM_TASK_LISTENER_THREAD_COUNT,
MRJobConfig.DEFAULT_MR_AM_TASK_LISTENER_THREAD_COUNT))
.setVerbose(false).setSecretManager(jobTokenSecretManager)
.build();
// Enable service authorization?
if (conf.getBoolean(

View File

@ -137,11 +137,12 @@ public class TestAuditLogger extends TestCase {
/**
* Test {@link AuditLogger} with IP set.
*/
@SuppressWarnings("deprecation")
public void testAuditLoggerWithIP() throws Exception {
Configuration conf = new Configuration();
// start the IPC server
Server server = RPC.getServer(new MyTestRPCServer(), "0.0.0.0", 0, conf);
Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class)
.setInstance(new MyTestRPCServer()).setBindAddress("0.0.0.0")
.setPort(0).build();
server.start();
InetSocketAddress addr = NetUtils.getConnectAddress(server);