diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
index 8d87957437a..7e6c7e3be26 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
@@ -1120,12 +1120,11 @@ public class Client {
if (LOG.isDebugEnabled())
LOG.debug(getName() + " got value #" + callId);
- Call call = calls.get(callId);
RpcStatusProto status = header.getStatus();
if (status == RpcStatusProto.SUCCESS) {
Writable value = ReflectionUtils.newInstance(valueClass, conf);
value.readFields(in); // read value
- calls.remove(callId);
+ final Call call = calls.remove(callId);
call.setRpcResponse(value);
// verify that length was correct
@@ -1157,7 +1156,7 @@ public class Client {
}
RemoteException re = new RemoteException(exceptionClassName, errorMsg, erCode);
if (status == RpcStatusProto.ERROR) {
- calls.remove(callId);
+ final Call call = calls.remove(callId);
call.setException(re);
} else if (status == RpcStatusProto.FATAL) {
// Close the connection
@@ -1288,85 +1287,6 @@ public class Client {
clientExcecutorFactory.unrefAndCleanup();
}
- /**
- * Same as {@link #call(RPC.RpcKind, Writable, ConnectionId)}
- * for RPC_BUILTIN
- */
- public Writable call(Writable param, InetSocketAddress address)
- throws IOException {
- ConnectionId remoteId = ConnectionId.getConnectionId(address, null, null, 0,
- conf);
- return call(RpcKind.RPC_BUILTIN, param, remoteId);
- }
-
- /**
- * Same as {@link #call(RPC.RpcKind, Writable, InetSocketAddress,
- * Class, UserGroupInformation, int, Configuration)}
- * except that rpcKind is writable.
- */
- public Writable call(Writable param, InetSocketAddress addr,
- Class> protocol, UserGroupInformation ticket,
- int rpcTimeout, Configuration conf) throws IOException {
- ConnectionId remoteId = ConnectionId.getConnectionId(addr, protocol,
- ticket, rpcTimeout, conf);
- return call(RPC.RpcKind.RPC_BUILTIN, param, remoteId);
- }
-
- /**
- * Same as {@link #call(Writable, InetSocketAddress,
- * Class, UserGroupInformation, int, Configuration)}
- * except that specifying serviceClass.
- */
- public Writable call(Writable param, InetSocketAddress addr,
- Class> protocol, UserGroupInformation ticket,
- int rpcTimeout, int serviceClass, Configuration conf)
- throws IOException {
- ConnectionId remoteId = ConnectionId.getConnectionId(addr, protocol,
- ticket, rpcTimeout, conf);
- return call(RPC.RpcKind.RPC_BUILTIN, param, remoteId, serviceClass);
- }
-
- /**
- * Make a call, passing param
, to the IPC server running at
- * address
which is servicing the protocol
protocol,
- * with the ticket
credentials, rpcTimeout
as
- * timeout and conf
as conf for this connection, returning the
- * value. Throws exceptions if there are network problems or if the remote
- * code threw an exception.
- */
- public Writable call(RPC.RpcKind rpcKind, Writable param, InetSocketAddress addr,
- Class> protocol, UserGroupInformation ticket,
- int rpcTimeout, Configuration conf) throws IOException {
- ConnectionId remoteId = ConnectionId.getConnectionId(addr, protocol,
- ticket, rpcTimeout, conf);
- return call(rpcKind, param, remoteId);
- }
-
- /**
- * Same as {link {@link #call(RPC.RpcKind, Writable, ConnectionId)}
- * except the rpcKind is RPC_BUILTIN
- */
- public Writable call(Writable param, ConnectionId remoteId)
- throws IOException {
- return call(RPC.RpcKind.RPC_BUILTIN, param, remoteId);
- }
-
- /**
- * Make a call, passing rpcRequest
, to the IPC server defined by
- * remoteId
, returning the rpc respond.
- *
- * @param rpcKind
- * @param rpcRequest - contains serialized method and method parameters
- * @param remoteId - the target rpc server
- * @returns the rpc response
- * Throws exceptions if there are network problems or if the remote code
- * threw an exception.
- */
- public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
- ConnectionId remoteId) throws IOException {
- return call(rpcKind, rpcRequest, remoteId, RPC.RPC_SERVICE_CLASS_DEFAULT);
- }
-
/**
* Make a call, passing rpcRequest
, to the IPC server defined by
* remoteId
, returning the rpc respond.
@@ -1387,23 +1307,6 @@ public class Client {
fallbackToSimpleAuth);
}
- /**
- * Make a call, passing rpcRequest
, to the IPC server defined by
- * remoteId
, returning the rpc response.
- *
- * @param rpcKind
- * @param rpcRequest - contains serialized method and method parameters
- * @param remoteId - the target rpc server
- * @param serviceClass - service class for RPC
- * @returns the rpc response
- * Throws exceptions if there are network problems or if the remote code
- * threw an exception.
- */
- public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
- ConnectionId remoteId, int serviceClass) throws IOException {
- return call(rpcKind, rpcRequest, remoteId, serviceClass, null);
- }
-
/**
* Make a call, passing rpcRequest
, to the IPC server defined by
* remoteId
, returning the rpc response.
@@ -1418,7 +1321,7 @@ public class Client {
* Throws exceptions if there are network problems or if the remote code
* threw an exception.
*/
- public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
+ Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
ConnectionId remoteId, int serviceClass,
AtomicBoolean fallbackToSimpleAuth) throws IOException {
final Call call = createCall(rpcKind, rpcRequest);
@@ -1620,12 +1523,6 @@ public class Client {
return saslQop;
}
- static ConnectionId getConnectionId(InetSocketAddress addr,
- Class> protocol, UserGroupInformation ticket, int rpcTimeout,
- Configuration conf) throws IOException {
- return getConnectionId(addr, protocol, ticket, rpcTimeout, null, conf);
- }
-
/**
* Returns a ConnectionId object.
* @param addr Remote address for the connection.
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java
index 46f33bafe5c..a1db6be7c43 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java
@@ -221,7 +221,7 @@ public class WritableRpcEngine implements RpcEngine {
int rpcTimeout, AtomicBoolean fallbackToSimpleAuth)
throws IOException {
this.remoteId = Client.ConnectionId.getConnectionId(address, protocol,
- ticket, rpcTimeout, conf);
+ ticket, rpcTimeout, null, conf);
this.client = CLIENTS.getClient(conf, factory);
this.fallbackToSimpleAuth = fallbackToSimpleAuth;
}
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
index 78dcdcd9c4c..d65818238eb 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
@@ -55,7 +55,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import javax.net.SocketFactory;
-import com.google.common.base.Supplier;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
@@ -63,7 +62,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.retry.DefaultFailoverProxyProvider;
@@ -78,9 +76,9 @@ import org.apache.hadoop.ipc.Server.Connection;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
import org.apache.hadoop.net.ConnectTimeoutException;
import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.log4j.Level;
import org.junit.Assert;
import org.junit.Assume;
@@ -91,6 +89,7 @@ import org.mockito.internal.util.reflection.Whitebox;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
+import com.google.common.base.Supplier;
import com.google.common.primitives.Bytes;
import com.google.common.primitives.Ints;
@@ -122,6 +121,33 @@ public class TestIPC {
/** Directory where we can count open file descriptors on Linux */
private static final File FD_DIR = new File("/proc/self/fd");
+ static ConnectionId getConnectionId(InetSocketAddress addr, int rpcTimeout,
+ Configuration conf) throws IOException {
+ return ConnectionId.getConnectionId(addr, null, null, rpcTimeout, null,
+ conf);
+ }
+
+ static Writable call(Client client, InetSocketAddress addr,
+ int serviceClass, Configuration conf) throws IOException {
+ final LongWritable param = new LongWritable(RANDOM.nextLong());
+ final ConnectionId remoteId = getConnectionId(addr, MIN_SLEEP_TIME, conf);
+ return client.call(RPC.RpcKind.RPC_BUILTIN, param, remoteId, serviceClass,
+ null);
+ }
+
+ static LongWritable call(Client client, long param, InetSocketAddress addr,
+ Configuration conf) throws IOException {
+ return call(client, new LongWritable(param), addr, 0, conf);
+ }
+
+ static LongWritable call(Client client, LongWritable param,
+ InetSocketAddress addr, int rpcTimeout, Configuration conf)
+ throws IOException {
+ final ConnectionId remoteId = getConnectionId(addr, rpcTimeout, conf);
+ return (LongWritable)client.call(RPC.RpcKind.RPC_BUILTIN, param, remoteId,
+ RPC.RPC_SERVICE_CLASS_DEFAULT, null);
+ }
+
private static class TestServer extends Server {
// Tests can set callListener to run a piece of code each time the server
// receives a call. This code executes on the server thread, so it has
@@ -183,10 +209,9 @@ public class TestIPC {
public void run() {
for (int i = 0; i < count; i++) {
try {
- LongWritable param = new LongWritable(RANDOM.nextLong());
- LongWritable value =
- (LongWritable)client.call(param, server, null, null, 0, conf);
- if (!param.equals(value)) {
+ final long param = RANDOM.nextLong();
+ LongWritable value = call(client, param, server, conf);
+ if (value.get() != param) {
LOG.fatal("Call failed!");
failed = true;
break;
@@ -226,9 +251,8 @@ public class TestIPC {
@Override
public Object invoke(Object proxy, Method method, Object[] args)
throws Throwable {
- LongWritable param = new LongWritable(RANDOM.nextLong());
- LongWritable value = (LongWritable) client.call(param,
- NetUtils.getConnectAddress(server), null, null, 0, conf);
+ LongWritable value = call(client, RANDOM.nextLong(),
+ NetUtils.getConnectAddress(server), conf);
return returnValue(value);
}
@@ -298,8 +322,7 @@ public class TestIPC {
Client client = new Client(LongWritable.class, conf);
InetSocketAddress address = new InetSocketAddress("127.0.0.1", 10);
try {
- client.call(new LongWritable(RANDOM.nextLong()),
- address, null, null, 0, conf);
+ call(client, RANDOM.nextLong(), address, conf);
fail("Expected an exception to have been thrown");
} catch (IOException e) {
String message = e.getMessage();
@@ -412,7 +435,7 @@ public class TestIPC {
LongWritable param = clientParamClass.newInstance();
try {
- client.call(param, addr, null, null, 0, conf);
+ call(client, param, addr, 0, conf);
fail("Expected an exception to have been thrown");
} catch (Throwable t) {
assertExceptionContains(t, "Injected fault");
@@ -422,7 +445,7 @@ public class TestIPC {
// ie the internal state of the client or server should not be broken
// by the failed call
WRITABLE_FAULTS_ENABLED = false;
- client.call(param, addr, null, null, 0, conf);
+ call(client, param, addr, 0, conf);
} finally {
client.stop();
@@ -536,8 +559,7 @@ public class TestIPC {
InetSocketAddress address = new InetSocketAddress("127.0.0.1", 10);
try {
- client.call(new LongWritable(RANDOM.nextLong()),
- address, null, null, 0, conf);
+ call(client, RANDOM.nextLong(), address, conf);
fail("Expected an exception to have been thrown");
} catch (IOException e) {
assertTrue(e.getMessage().contains("Injected fault"));
@@ -574,8 +596,7 @@ public class TestIPC {
// Call should fail due to injected exception.
InetSocketAddress address = NetUtils.getConnectAddress(server);
try {
- client.call(new LongWritable(RANDOM.nextLong()),
- address, null, null, 0, conf);
+ call(client, RANDOM.nextLong(), address, conf);
fail("Expected an exception to have been thrown");
} catch (Exception e) {
LOG.info("caught expected exception", e);
@@ -586,8 +607,7 @@ public class TestIPC {
// (i.e. it should not have cached a half-constructed connection)
Mockito.reset(spyFactory);
- client.call(new LongWritable(RANDOM.nextLong()),
- address, null, null, 0, conf);
+ call(client, RANDOM.nextLong(), address, conf);
} finally {
client.stop();
server.stop();
@@ -605,15 +625,15 @@ public class TestIPC {
Client client = new Client(LongWritable.class, conf);
// set timeout to be less than MIN_SLEEP_TIME
try {
- client.call(new LongWritable(RANDOM.nextLong()),
- addr, null, null, MIN_SLEEP_TIME/2, conf);
+ call(client, new LongWritable(RANDOM.nextLong()), addr,
+ MIN_SLEEP_TIME / 2, conf);
fail("Expected an exception to have been thrown");
} catch (SocketTimeoutException e) {
LOG.info("Get a SocketTimeoutException ", e);
}
// set timeout to be bigger than 3*ping interval
- client.call(new LongWritable(RANDOM.nextLong()),
- addr, null, null, 3*PING_INTERVAL+MIN_SLEEP_TIME, conf);
+ call(client, new LongWritable(RANDOM.nextLong()), addr,
+ 3 * PING_INTERVAL + MIN_SLEEP_TIME, conf);
client.stop();
}
@@ -629,8 +649,8 @@ public class TestIPC {
Client client = new Client(LongWritable.class, conf);
// set the rpc timeout to twice the MIN_SLEEP_TIME
try {
- client.call(new LongWritable(RANDOM.nextLong()),
- addr, null, null, MIN_SLEEP_TIME*2, conf);
+ call(client, new LongWritable(RANDOM.nextLong()), addr,
+ MIN_SLEEP_TIME * 2, conf);
fail("Expected an exception to have been thrown");
} catch (SocketTimeoutException e) {
LOG.info("Get a SocketTimeoutException ", e);
@@ -743,8 +763,8 @@ public class TestIPC {
public void run() {
Client client = new Client(LongWritable.class, conf);
try {
- client.call(new LongWritable(Thread.currentThread().getId()),
- addr, null, null, 60000, conf);
+ call(client, new LongWritable(Thread.currentThread().getId()),
+ addr, 60000, conf);
} catch (Throwable e) {
LOG.error(e);
failures.incrementAndGet();
@@ -875,8 +895,7 @@ public class TestIPC {
public void run() {
Client client = new Client(LongWritable.class, clientConf);
try {
- client.call(new LongWritable(Thread.currentThread().getId()),
- addr, null, null, 0, clientConf);
+ call(client, Thread.currentThread().getId(), addr, clientConf);
callReturned.countDown();
Thread.sleep(10000);
} catch (IOException e) {
@@ -931,16 +950,15 @@ public class TestIPC {
}
}
}
-
+
/**
* Make a call from a client and verify if header info is changed in server side
*/
- private void callAndVerify(Server server, InetSocketAddress addr,
+ private static void callAndVerify(Server server, InetSocketAddress addr,
int serviceClass, boolean noChanged) throws IOException{
Client client = new Client(LongWritable.class, conf);
- client.call(new LongWritable(RANDOM.nextLong()),
- addr, null, null, MIN_SLEEP_TIME, serviceClass, conf);
+ call(client, addr, serviceClass, conf);
Connection connection = server.getConnections()[0];
int serviceClass2 = connection.getServiceClass();
assertFalse(noChanged ^ serviceClass == serviceClass2);
@@ -956,13 +974,11 @@ public class TestIPC {
// start client
Client client = new Client(LongWritable.class, conf);
- client.call(new LongWritable(RANDOM.nextLong()),
- addr, null, null, MIN_SLEEP_TIME, 0, conf);
+ call(client, addr, 0, conf);
client.stop();
// This call should throw IOException.
- client.call(new LongWritable(RANDOM.nextLong()),
- addr, null, null, MIN_SLEEP_TIME, 0, conf);
+ call(client, addr, 0, conf);
}
/**
@@ -992,7 +1008,7 @@ public class TestIPC {
@Test(timeout=30000)
public void testInterrupted() {
Client client = new Client(LongWritable.class, conf);
- client.getClientExecutor().submit(new Runnable() {
+ Client.getClientExecutor().submit(new Runnable() {
public void run() {
while(true);
}
@@ -1007,7 +1023,7 @@ public class TestIPC {
Assert.fail("The Client did not interrupt after handling an Interrupted Exception");
}
// Clear Thread interrupt
- Thread.currentThread().interrupted();
+ Thread.interrupted();
}
private long countOpenFileDescriptors() {
@@ -1363,11 +1379,10 @@ public class TestIPC {
int maxTimeoutRetries) throws IOException {
SocketFactory mockFactory = Mockito.mock(SocketFactory.class);
doThrow(new ConnectTimeoutException("fake")).when(mockFactory).createSocket();
- Client client = new Client(IntWritable.class, conf, mockFactory);
+ Client client = new Client(LongWritable.class, conf, mockFactory);
InetSocketAddress address = new InetSocketAddress("127.0.0.1", 9090);
try {
- client.call(new IntWritable(RANDOM.nextInt()), address, null, null, 0,
- conf);
+ call(client, RANDOM.nextLong(), address, conf);
fail("Not throwing the SocketTimeoutException");
} catch (SocketTimeoutException e) {
Mockito.verify(mockFactory, Mockito.times(maxTimeoutRetries))
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPCServerResponder.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPCServerResponder.java
index a3bf995e5d4..546cb8f472e 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPCServerResponder.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPCServerResponder.java
@@ -39,6 +39,8 @@ import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.ipc.Client.ConnectionId;
+import org.apache.hadoop.ipc.RPC.RpcKind;
import org.apache.hadoop.ipc.Server.Call;
import org.apache.hadoop.net.NetUtils;
import org.junit.Assert;
@@ -66,6 +68,14 @@ public class TestIPCServerResponder {
BYTES[i] = (byte) ('a' + (i % 26));
}
+ static Writable call(Client client, Writable param,
+ InetSocketAddress address) throws IOException {
+ final ConnectionId remoteId = ConnectionId.getConnectionId(address, null,
+ null, 0, null, conf);
+ return client.call(RpcKind.RPC_BUILTIN, param, remoteId,
+ RPC.RPC_SERVICE_CLASS_DEFAULT, null);
+ }
+
private static class TestServer extends Server {
private boolean sleep;
@@ -113,7 +123,7 @@ public class TestIPCServerResponder {
byte[] bytes = new byte[byteSize];
System.arraycopy(BYTES, 0, bytes, 0, byteSize);
Writable param = new BytesWritable(bytes);
- client.call(param, address);
+ call(client, param, address);
Thread.sleep(RANDOM.nextInt(20));
} catch (Exception e) {
LOG.fatal("Caught Exception", e);
@@ -209,17 +219,16 @@ public class TestIPCServerResponder {
// calls should return immediately, check the sequence number is
// increasing
- assertEquals(0,
- ((IntWritable)client.call(wait0, address)).get());
- assertEquals(1,
- ((IntWritable)client.call(wait0, address)).get());
+ assertEquals(0, ((IntWritable)call(client, wait0, address)).get());
+ assertEquals(1, ((IntWritable)call(client, wait0, address)).get());
// do a call in the background that will have a deferred response
final ExecutorService exec = Executors.newCachedThreadPool();
Future future1 = exec.submit(new Callable() {
@Override
public Integer call() throws IOException {
- return ((IntWritable)client.call(wait1, address)).get();
+ return ((IntWritable)TestIPCServerResponder.call(
+ client, wait1, address)).get();
}
});
// make sure it blocked
@@ -237,14 +246,14 @@ public class TestIPCServerResponder {
// proves the handler isn't tied up, and that the prior sequence number
// was consumed
- assertEquals(3,
- ((IntWritable)client.call(wait0, address)).get());
+ assertEquals(3, ((IntWritable)call(client, wait0, address)).get());
// another call with wait count of 2
Future future2 = exec.submit(new Callable() {
@Override
public Integer call() throws IOException {
- return ((IntWritable)client.call(wait2, address)).get();
+ return ((IntWritable)TestIPCServerResponder.call(
+ client, wait2, address)).get();
}
});
// make sure it blocked
@@ -286,8 +295,7 @@ public class TestIPCServerResponder {
assertFalse(future2.isDone());
// call should return immediately
- assertEquals(5,
- ((IntWritable)client.call(wait0, address)).get());
+ assertEquals(5, ((IntWritable)call(client, wait0, address)).get());
// trigger last waiting call
waitingCalls[1].sendResponse();
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestSaslRPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestSaslRPC.java
index a32ea2cabc9..8df3b1ded6e 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestSaslRPC.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestSaslRPC.java
@@ -18,8 +18,8 @@
package org.apache.hadoop.ipc;
-import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_RPC_PROTECTION;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION;
import static org.apache.hadoop.security.SaslRpcServer.AuthMethod.KERBEROS;
import static org.apache.hadoop.security.SaslRpcServer.AuthMethod.SIMPLE;
import static org.apache.hadoop.security.SaslRpcServer.AuthMethod.TOKEN;
@@ -42,16 +42,13 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
-import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
@@ -454,7 +451,12 @@ public class TestSaslRPC {
}
}
}
-
+
+ static ConnectionId getConnectionId(Configuration conf) throws IOException {
+ return ConnectionId.getConnectionId(new InetSocketAddress(0),
+ TestSaslProtocol.class, null, 0, null, conf);
+ }
+
@Test
public void testPingInterval() throws Exception {
Configuration newConf = new Configuration(conf);
@@ -464,14 +466,12 @@ public class TestSaslRPC {
// set doPing to true
newConf.setBoolean(CommonConfigurationKeys.IPC_CLIENT_PING_KEY, true);
- ConnectionId remoteId = ConnectionId.getConnectionId(
- new InetSocketAddress(0), TestSaslProtocol.class, null, 0, newConf);
+ ConnectionId remoteId = getConnectionId(newConf);
assertEquals(CommonConfigurationKeys.IPC_PING_INTERVAL_DEFAULT,
remoteId.getPingInterval());
// set doPing to false
newConf.setBoolean(CommonConfigurationKeys.IPC_CLIENT_PING_KEY, false);
- remoteId = ConnectionId.getConnectionId(
- new InetSocketAddress(0), TestSaslProtocol.class, null, 0, newConf);
+ remoteId = getConnectionId(newConf);
assertEquals(0, remoteId.getPingInterval());
}