diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java index 1d460994027..093fe1e8e99 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java @@ -1120,12 +1120,11 @@ public class Client { if (LOG.isDebugEnabled()) LOG.debug(getName() + " got value #" + callId); - Call call = calls.get(callId); RpcStatusProto status = header.getStatus(); if (status == RpcStatusProto.SUCCESS) { Writable value = ReflectionUtils.newInstance(valueClass, conf); value.readFields(in); // read value - calls.remove(callId); + final Call call = calls.remove(callId); call.setRpcResponse(value); // verify that length was correct @@ -1157,7 +1156,7 @@ public class Client { } RemoteException re = new RemoteException(exceptionClassName, errorMsg, erCode); if (status == RpcStatusProto.ERROR) { - calls.remove(callId); + final Call call = calls.remove(callId); call.setException(re); } else if (status == RpcStatusProto.FATAL) { // Close the connection @@ -1288,85 +1287,6 @@ public class Client { clientExcecutorFactory.unrefAndCleanup(); } - /** - * Same as {@link #call(RPC.RpcKind, Writable, ConnectionId)} - * for RPC_BUILTIN - */ - public Writable call(Writable param, InetSocketAddress address) - throws IOException { - ConnectionId remoteId = ConnectionId.getConnectionId(address, null, null, 0, - conf); - return call(RpcKind.RPC_BUILTIN, param, remoteId); - } - - /** - * Same as {@link #call(RPC.RpcKind, Writable, InetSocketAddress, - * Class, UserGroupInformation, int, Configuration)} - * except that rpcKind is writable. - */ - public Writable call(Writable param, InetSocketAddress addr, - Class protocol, UserGroupInformation ticket, - int rpcTimeout, Configuration conf) throws IOException { - ConnectionId remoteId = ConnectionId.getConnectionId(addr, protocol, - ticket, rpcTimeout, conf); - return call(RPC.RpcKind.RPC_BUILTIN, param, remoteId); - } - - /** - * Same as {@link #call(Writable, InetSocketAddress, - * Class, UserGroupInformation, int, Configuration)} - * except that specifying serviceClass. - */ - public Writable call(Writable param, InetSocketAddress addr, - Class protocol, UserGroupInformation ticket, - int rpcTimeout, int serviceClass, Configuration conf) - throws IOException { - ConnectionId remoteId = ConnectionId.getConnectionId(addr, protocol, - ticket, rpcTimeout, conf); - return call(RPC.RpcKind.RPC_BUILTIN, param, remoteId, serviceClass); - } - - /** - * Make a call, passing param, to the IPC server running at - * address which is servicing the protocol protocol, - * with the ticket credentials, rpcTimeout as - * timeout and conf as conf for this connection, returning the - * value. Throws exceptions if there are network problems or if the remote - * code threw an exception. - */ - public Writable call(RPC.RpcKind rpcKind, Writable param, InetSocketAddress addr, - Class protocol, UserGroupInformation ticket, - int rpcTimeout, Configuration conf) throws IOException { - ConnectionId remoteId = ConnectionId.getConnectionId(addr, protocol, - ticket, rpcTimeout, conf); - return call(rpcKind, param, remoteId); - } - - /** - * Same as {link {@link #call(RPC.RpcKind, Writable, ConnectionId)} - * except the rpcKind is RPC_BUILTIN - */ - public Writable call(Writable param, ConnectionId remoteId) - throws IOException { - return call(RPC.RpcKind.RPC_BUILTIN, param, remoteId); - } - - /** - * Make a call, passing rpcRequest, to the IPC server defined by - * remoteId, returning the rpc respond. - * - * @param rpcKind - * @param rpcRequest - contains serialized method and method parameters - * @param remoteId - the target rpc server - * @returns the rpc response - * Throws exceptions if there are network problems or if the remote code - * threw an exception. - */ - public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest, - ConnectionId remoteId) throws IOException { - return call(rpcKind, rpcRequest, remoteId, RPC.RPC_SERVICE_CLASS_DEFAULT); - } - /** * Make a call, passing rpcRequest, to the IPC server defined by * remoteId, returning the rpc respond. @@ -1387,23 +1307,6 @@ public class Client { fallbackToSimpleAuth); } - /** - * Make a call, passing rpcRequest, to the IPC server defined by - * remoteId, returning the rpc response. - * - * @param rpcKind - * @param rpcRequest - contains serialized method and method parameters - * @param remoteId - the target rpc server - * @param serviceClass - service class for RPC - * @returns the rpc response - * Throws exceptions if there are network problems or if the remote code - * threw an exception. - */ - public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest, - ConnectionId remoteId, int serviceClass) throws IOException { - return call(rpcKind, rpcRequest, remoteId, serviceClass, null); - } - /** * Make a call, passing rpcRequest, to the IPC server defined by * remoteId, returning the rpc response. @@ -1418,7 +1321,7 @@ public class Client { * Throws exceptions if there are network problems or if the remote code * threw an exception. */ - public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest, + Writable call(RPC.RpcKind rpcKind, Writable rpcRequest, ConnectionId remoteId, int serviceClass, AtomicBoolean fallbackToSimpleAuth) throws IOException { final Call call = createCall(rpcKind, rpcRequest); @@ -1620,12 +1523,6 @@ public class Client { return saslQop; } - static ConnectionId getConnectionId(InetSocketAddress addr, - Class protocol, UserGroupInformation ticket, int rpcTimeout, - Configuration conf) throws IOException { - return getConnectionId(addr, protocol, ticket, rpcTimeout, null, conf); - } - /** * Returns a ConnectionId object. * @param addr Remote address for the connection. diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java index 8392c9117b4..c88a0441cb6 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java @@ -221,7 +221,7 @@ public class WritableRpcEngine implements RpcEngine { int rpcTimeout, AtomicBoolean fallbackToSimpleAuth) throws IOException { this.remoteId = Client.ConnectionId.getConnectionId(address, protocol, - ticket, rpcTimeout, conf); + ticket, rpcTimeout, null, conf); this.client = CLIENTS.getClient(conf, factory); this.fallbackToSimpleAuth = fallbackToSimpleAuth; } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java index 78dcdcd9c4c..d65818238eb 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java @@ -55,7 +55,6 @@ import java.util.concurrent.atomic.AtomicInteger; import javax.net.SocketFactory; -import com.google.common.base.Supplier; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.impl.Log4JLogger; @@ -63,7 +62,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.retry.DefaultFailoverProxyProvider; @@ -78,9 +76,9 @@ import org.apache.hadoop.ipc.Server.Connection; import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto; import org.apache.hadoop.net.ConnectTimeoutException; import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.StringUtils; -import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.log4j.Level; import org.junit.Assert; import org.junit.Assume; @@ -91,6 +89,7 @@ import org.mockito.internal.util.reflection.Whitebox; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import com.google.common.base.Supplier; import com.google.common.primitives.Bytes; import com.google.common.primitives.Ints; @@ -122,6 +121,33 @@ public class TestIPC { /** Directory where we can count open file descriptors on Linux */ private static final File FD_DIR = new File("/proc/self/fd"); + static ConnectionId getConnectionId(InetSocketAddress addr, int rpcTimeout, + Configuration conf) throws IOException { + return ConnectionId.getConnectionId(addr, null, null, rpcTimeout, null, + conf); + } + + static Writable call(Client client, InetSocketAddress addr, + int serviceClass, Configuration conf) throws IOException { + final LongWritable param = new LongWritable(RANDOM.nextLong()); + final ConnectionId remoteId = getConnectionId(addr, MIN_SLEEP_TIME, conf); + return client.call(RPC.RpcKind.RPC_BUILTIN, param, remoteId, serviceClass, + null); + } + + static LongWritable call(Client client, long param, InetSocketAddress addr, + Configuration conf) throws IOException { + return call(client, new LongWritable(param), addr, 0, conf); + } + + static LongWritable call(Client client, LongWritable param, + InetSocketAddress addr, int rpcTimeout, Configuration conf) + throws IOException { + final ConnectionId remoteId = getConnectionId(addr, rpcTimeout, conf); + return (LongWritable)client.call(RPC.RpcKind.RPC_BUILTIN, param, remoteId, + RPC.RPC_SERVICE_CLASS_DEFAULT, null); + } + private static class TestServer extends Server { // Tests can set callListener to run a piece of code each time the server // receives a call. This code executes on the server thread, so it has @@ -183,10 +209,9 @@ public class TestIPC { public void run() { for (int i = 0; i < count; i++) { try { - LongWritable param = new LongWritable(RANDOM.nextLong()); - LongWritable value = - (LongWritable)client.call(param, server, null, null, 0, conf); - if (!param.equals(value)) { + final long param = RANDOM.nextLong(); + LongWritable value = call(client, param, server, conf); + if (value.get() != param) { LOG.fatal("Call failed!"); failed = true; break; @@ -226,9 +251,8 @@ public class TestIPC { @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { - LongWritable param = new LongWritable(RANDOM.nextLong()); - LongWritable value = (LongWritable) client.call(param, - NetUtils.getConnectAddress(server), null, null, 0, conf); + LongWritable value = call(client, RANDOM.nextLong(), + NetUtils.getConnectAddress(server), conf); return returnValue(value); } @@ -298,8 +322,7 @@ public class TestIPC { Client client = new Client(LongWritable.class, conf); InetSocketAddress address = new InetSocketAddress("127.0.0.1", 10); try { - client.call(new LongWritable(RANDOM.nextLong()), - address, null, null, 0, conf); + call(client, RANDOM.nextLong(), address, conf); fail("Expected an exception to have been thrown"); } catch (IOException e) { String message = e.getMessage(); @@ -412,7 +435,7 @@ public class TestIPC { LongWritable param = clientParamClass.newInstance(); try { - client.call(param, addr, null, null, 0, conf); + call(client, param, addr, 0, conf); fail("Expected an exception to have been thrown"); } catch (Throwable t) { assertExceptionContains(t, "Injected fault"); @@ -422,7 +445,7 @@ public class TestIPC { // ie the internal state of the client or server should not be broken // by the failed call WRITABLE_FAULTS_ENABLED = false; - client.call(param, addr, null, null, 0, conf); + call(client, param, addr, 0, conf); } finally { client.stop(); @@ -536,8 +559,7 @@ public class TestIPC { InetSocketAddress address = new InetSocketAddress("127.0.0.1", 10); try { - client.call(new LongWritable(RANDOM.nextLong()), - address, null, null, 0, conf); + call(client, RANDOM.nextLong(), address, conf); fail("Expected an exception to have been thrown"); } catch (IOException e) { assertTrue(e.getMessage().contains("Injected fault")); @@ -574,8 +596,7 @@ public class TestIPC { // Call should fail due to injected exception. InetSocketAddress address = NetUtils.getConnectAddress(server); try { - client.call(new LongWritable(RANDOM.nextLong()), - address, null, null, 0, conf); + call(client, RANDOM.nextLong(), address, conf); fail("Expected an exception to have been thrown"); } catch (Exception e) { LOG.info("caught expected exception", e); @@ -586,8 +607,7 @@ public class TestIPC { // (i.e. it should not have cached a half-constructed connection) Mockito.reset(spyFactory); - client.call(new LongWritable(RANDOM.nextLong()), - address, null, null, 0, conf); + call(client, RANDOM.nextLong(), address, conf); } finally { client.stop(); server.stop(); @@ -605,15 +625,15 @@ public class TestIPC { Client client = new Client(LongWritable.class, conf); // set timeout to be less than MIN_SLEEP_TIME try { - client.call(new LongWritable(RANDOM.nextLong()), - addr, null, null, MIN_SLEEP_TIME/2, conf); + call(client, new LongWritable(RANDOM.nextLong()), addr, + MIN_SLEEP_TIME / 2, conf); fail("Expected an exception to have been thrown"); } catch (SocketTimeoutException e) { LOG.info("Get a SocketTimeoutException ", e); } // set timeout to be bigger than 3*ping interval - client.call(new LongWritable(RANDOM.nextLong()), - addr, null, null, 3*PING_INTERVAL+MIN_SLEEP_TIME, conf); + call(client, new LongWritable(RANDOM.nextLong()), addr, + 3 * PING_INTERVAL + MIN_SLEEP_TIME, conf); client.stop(); } @@ -629,8 +649,8 @@ public class TestIPC { Client client = new Client(LongWritable.class, conf); // set the rpc timeout to twice the MIN_SLEEP_TIME try { - client.call(new LongWritable(RANDOM.nextLong()), - addr, null, null, MIN_SLEEP_TIME*2, conf); + call(client, new LongWritable(RANDOM.nextLong()), addr, + MIN_SLEEP_TIME * 2, conf); fail("Expected an exception to have been thrown"); } catch (SocketTimeoutException e) { LOG.info("Get a SocketTimeoutException ", e); @@ -743,8 +763,8 @@ public class TestIPC { public void run() { Client client = new Client(LongWritable.class, conf); try { - client.call(new LongWritable(Thread.currentThread().getId()), - addr, null, null, 60000, conf); + call(client, new LongWritable(Thread.currentThread().getId()), + addr, 60000, conf); } catch (Throwable e) { LOG.error(e); failures.incrementAndGet(); @@ -875,8 +895,7 @@ public class TestIPC { public void run() { Client client = new Client(LongWritable.class, clientConf); try { - client.call(new LongWritable(Thread.currentThread().getId()), - addr, null, null, 0, clientConf); + call(client, Thread.currentThread().getId(), addr, clientConf); callReturned.countDown(); Thread.sleep(10000); } catch (IOException e) { @@ -931,16 +950,15 @@ public class TestIPC { } } } - + /** * Make a call from a client and verify if header info is changed in server side */ - private void callAndVerify(Server server, InetSocketAddress addr, + private static void callAndVerify(Server server, InetSocketAddress addr, int serviceClass, boolean noChanged) throws IOException{ Client client = new Client(LongWritable.class, conf); - client.call(new LongWritable(RANDOM.nextLong()), - addr, null, null, MIN_SLEEP_TIME, serviceClass, conf); + call(client, addr, serviceClass, conf); Connection connection = server.getConnections()[0]; int serviceClass2 = connection.getServiceClass(); assertFalse(noChanged ^ serviceClass == serviceClass2); @@ -956,13 +974,11 @@ public class TestIPC { // start client Client client = new Client(LongWritable.class, conf); - client.call(new LongWritable(RANDOM.nextLong()), - addr, null, null, MIN_SLEEP_TIME, 0, conf); + call(client, addr, 0, conf); client.stop(); // This call should throw IOException. - client.call(new LongWritable(RANDOM.nextLong()), - addr, null, null, MIN_SLEEP_TIME, 0, conf); + call(client, addr, 0, conf); } /** @@ -992,7 +1008,7 @@ public class TestIPC { @Test(timeout=30000) public void testInterrupted() { Client client = new Client(LongWritable.class, conf); - client.getClientExecutor().submit(new Runnable() { + Client.getClientExecutor().submit(new Runnable() { public void run() { while(true); } @@ -1007,7 +1023,7 @@ public class TestIPC { Assert.fail("The Client did not interrupt after handling an Interrupted Exception"); } // Clear Thread interrupt - Thread.currentThread().interrupted(); + Thread.interrupted(); } private long countOpenFileDescriptors() { @@ -1363,11 +1379,10 @@ public class TestIPC { int maxTimeoutRetries) throws IOException { SocketFactory mockFactory = Mockito.mock(SocketFactory.class); doThrow(new ConnectTimeoutException("fake")).when(mockFactory).createSocket(); - Client client = new Client(IntWritable.class, conf, mockFactory); + Client client = new Client(LongWritable.class, conf, mockFactory); InetSocketAddress address = new InetSocketAddress("127.0.0.1", 9090); try { - client.call(new IntWritable(RANDOM.nextInt()), address, null, null, 0, - conf); + call(client, RANDOM.nextLong(), address, conf); fail("Not throwing the SocketTimeoutException"); } catch (SocketTimeoutException e) { Mockito.verify(mockFactory, Mockito.times(maxTimeoutRetries)) diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPCServerResponder.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPCServerResponder.java index a3bf995e5d4..546cb8f472e 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPCServerResponder.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPCServerResponder.java @@ -39,6 +39,8 @@ import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Writable; +import org.apache.hadoop.ipc.Client.ConnectionId; +import org.apache.hadoop.ipc.RPC.RpcKind; import org.apache.hadoop.ipc.Server.Call; import org.apache.hadoop.net.NetUtils; import org.junit.Assert; @@ -66,6 +68,14 @@ public class TestIPCServerResponder { BYTES[i] = (byte) ('a' + (i % 26)); } + static Writable call(Client client, Writable param, + InetSocketAddress address) throws IOException { + final ConnectionId remoteId = ConnectionId.getConnectionId(address, null, + null, 0, null, conf); + return client.call(RpcKind.RPC_BUILTIN, param, remoteId, + RPC.RPC_SERVICE_CLASS_DEFAULT, null); + } + private static class TestServer extends Server { private boolean sleep; @@ -113,7 +123,7 @@ public class TestIPCServerResponder { byte[] bytes = new byte[byteSize]; System.arraycopy(BYTES, 0, bytes, 0, byteSize); Writable param = new BytesWritable(bytes); - client.call(param, address); + call(client, param, address); Thread.sleep(RANDOM.nextInt(20)); } catch (Exception e) { LOG.fatal("Caught Exception", e); @@ -209,17 +219,16 @@ public class TestIPCServerResponder { // calls should return immediately, check the sequence number is // increasing - assertEquals(0, - ((IntWritable)client.call(wait0, address)).get()); - assertEquals(1, - ((IntWritable)client.call(wait0, address)).get()); + assertEquals(0, ((IntWritable)call(client, wait0, address)).get()); + assertEquals(1, ((IntWritable)call(client, wait0, address)).get()); // do a call in the background that will have a deferred response final ExecutorService exec = Executors.newCachedThreadPool(); Future future1 = exec.submit(new Callable() { @Override public Integer call() throws IOException { - return ((IntWritable)client.call(wait1, address)).get(); + return ((IntWritable)TestIPCServerResponder.call( + client, wait1, address)).get(); } }); // make sure it blocked @@ -237,14 +246,14 @@ public class TestIPCServerResponder { // proves the handler isn't tied up, and that the prior sequence number // was consumed - assertEquals(3, - ((IntWritable)client.call(wait0, address)).get()); + assertEquals(3, ((IntWritable)call(client, wait0, address)).get()); // another call with wait count of 2 Future future2 = exec.submit(new Callable() { @Override public Integer call() throws IOException { - return ((IntWritable)client.call(wait2, address)).get(); + return ((IntWritable)TestIPCServerResponder.call( + client, wait2, address)).get(); } }); // make sure it blocked @@ -286,8 +295,7 @@ public class TestIPCServerResponder { assertFalse(future2.isDone()); // call should return immediately - assertEquals(5, - ((IntWritable)client.call(wait0, address)).get()); + assertEquals(5, ((IntWritable)call(client, wait0, address)).get()); // trigger last waiting call waitingCalls[1].sendResponse(); diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestSaslRPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestSaslRPC.java index a32ea2cabc9..8df3b1ded6e 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestSaslRPC.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestSaslRPC.java @@ -18,8 +18,8 @@ package org.apache.hadoop.ipc; -import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_RPC_PROTECTION; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION; import static org.apache.hadoop.security.SaslRpcServer.AuthMethod.KERBEROS; import static org.apache.hadoop.security.SaslRpcServer.AuthMethod.SIMPLE; import static org.apache.hadoop.security.SaslRpcServer.AuthMethod.TOKEN; @@ -42,16 +42,13 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; -import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; @@ -454,7 +451,12 @@ public class TestSaslRPC { } } } - + + static ConnectionId getConnectionId(Configuration conf) throws IOException { + return ConnectionId.getConnectionId(new InetSocketAddress(0), + TestSaslProtocol.class, null, 0, null, conf); + } + @Test public void testPingInterval() throws Exception { Configuration newConf = new Configuration(conf); @@ -464,14 +466,12 @@ public class TestSaslRPC { // set doPing to true newConf.setBoolean(CommonConfigurationKeys.IPC_CLIENT_PING_KEY, true); - ConnectionId remoteId = ConnectionId.getConnectionId( - new InetSocketAddress(0), TestSaslProtocol.class, null, 0, newConf); + ConnectionId remoteId = getConnectionId(newConf); assertEquals(CommonConfigurationKeys.IPC_PING_INTERVAL_DEFAULT, remoteId.getPingInterval()); // set doPing to false newConf.setBoolean(CommonConfigurationKeys.IPC_CLIENT_PING_KEY, false); - remoteId = ConnectionId.getConnectionId( - new InetSocketAddress(0), TestSaslProtocol.class, null, 0, newConf); + remoteId = getConnectionId(newConf); assertEquals(0, remoteId.getPingInterval()); }