HADOOP-12923. Move the test code in ipc.Client to test.

This commit is contained in:
Tsz-Wo Nicholas Sze 2016-03-14 15:48:01 -07:00
parent 8ac3fc7540
commit f85d979a3f
5 changed files with 90 additions and 170 deletions

View File

@ -1120,12 +1120,11 @@ private void receiveRpcResponse() {
if (LOG.isDebugEnabled())
LOG.debug(getName() + " got value #" + callId);
Call call = calls.get(callId);
RpcStatusProto status = header.getStatus();
if (status == RpcStatusProto.SUCCESS) {
Writable value = ReflectionUtils.newInstance(valueClass, conf);
value.readFields(in); // read value
calls.remove(callId);
final Call call = calls.remove(callId);
call.setRpcResponse(value);
// verify that length was correct
@ -1157,7 +1156,7 @@ private void receiveRpcResponse() {
}
RemoteException re = new RemoteException(exceptionClassName, errorMsg, erCode);
if (status == RpcStatusProto.ERROR) {
calls.remove(callId);
final Call call = calls.remove(callId);
call.setException(re);
} else if (status == RpcStatusProto.FATAL) {
// Close the connection
@ -1288,85 +1287,6 @@ public void stop() {
clientExcecutorFactory.unrefAndCleanup();
}
/**
* Same as {@link #call(RPC.RpcKind, Writable, ConnectionId)}
* for RPC_BUILTIN
*/
public Writable call(Writable param, InetSocketAddress address)
throws IOException {
ConnectionId remoteId = ConnectionId.getConnectionId(address, null, null, 0,
conf);
return call(RpcKind.RPC_BUILTIN, param, remoteId);
}
/**
* Same as {@link #call(RPC.RpcKind, Writable, InetSocketAddress,
* Class, UserGroupInformation, int, Configuration)}
* except that rpcKind is writable.
*/
public Writable call(Writable param, InetSocketAddress addr,
Class<?> protocol, UserGroupInformation ticket,
int rpcTimeout, Configuration conf) throws IOException {
ConnectionId remoteId = ConnectionId.getConnectionId(addr, protocol,
ticket, rpcTimeout, conf);
return call(RPC.RpcKind.RPC_BUILTIN, param, remoteId);
}
/**
* Same as {@link #call(Writable, InetSocketAddress,
* Class, UserGroupInformation, int, Configuration)}
* except that specifying serviceClass.
*/
public Writable call(Writable param, InetSocketAddress addr,
Class<?> protocol, UserGroupInformation ticket,
int rpcTimeout, int serviceClass, Configuration conf)
throws IOException {
ConnectionId remoteId = ConnectionId.getConnectionId(addr, protocol,
ticket, rpcTimeout, conf);
return call(RPC.RpcKind.RPC_BUILTIN, param, remoteId, serviceClass);
}
/**
* Make a call, passing <code>param</code>, to the IPC server running at
* <code>address</code> which is servicing the <code>protocol</code> protocol,
* with the <code>ticket</code> credentials, <code>rpcTimeout</code> as
* timeout and <code>conf</code> as conf for this connection, returning the
* value. Throws exceptions if there are network problems or if the remote
* code threw an exception.
*/
public Writable call(RPC.RpcKind rpcKind, Writable param, InetSocketAddress addr,
Class<?> protocol, UserGroupInformation ticket,
int rpcTimeout, Configuration conf) throws IOException {
ConnectionId remoteId = ConnectionId.getConnectionId(addr, protocol,
ticket, rpcTimeout, conf);
return call(rpcKind, param, remoteId);
}
/**
* Same as {link {@link #call(RPC.RpcKind, Writable, ConnectionId)}
* except the rpcKind is RPC_BUILTIN
*/
public Writable call(Writable param, ConnectionId remoteId)
throws IOException {
return call(RPC.RpcKind.RPC_BUILTIN, param, remoteId);
}
/**
* Make a call, passing <code>rpcRequest</code>, to the IPC server defined by
* <code>remoteId</code>, returning the rpc respond.
*
* @param rpcKind
* @param rpcRequest - contains serialized method and method parameters
* @param remoteId - the target rpc server
* @returns the rpc response
* Throws exceptions if there are network problems or if the remote code
* threw an exception.
*/
public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
ConnectionId remoteId) throws IOException {
return call(rpcKind, rpcRequest, remoteId, RPC.RPC_SERVICE_CLASS_DEFAULT);
}
/**
* Make a call, passing <code>rpcRequest</code>, to the IPC server defined by
* <code>remoteId</code>, returning the rpc respond.
@ -1387,23 +1307,6 @@ public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
fallbackToSimpleAuth);
}
/**
* Make a call, passing <code>rpcRequest</code>, to the IPC server defined by
* <code>remoteId</code>, returning the rpc response.
*
* @param rpcKind
* @param rpcRequest - contains serialized method and method parameters
* @param remoteId - the target rpc server
* @param serviceClass - service class for RPC
* @returns the rpc response
* Throws exceptions if there are network problems or if the remote code
* threw an exception.
*/
public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
ConnectionId remoteId, int serviceClass) throws IOException {
return call(rpcKind, rpcRequest, remoteId, serviceClass, null);
}
/**
* Make a call, passing <code>rpcRequest</code>, to the IPC server defined by
* <code>remoteId</code>, returning the rpc response.
@ -1418,7 +1321,7 @@ public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
* Throws exceptions if there are network problems or if the remote code
* threw an exception.
*/
public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
ConnectionId remoteId, int serviceClass,
AtomicBoolean fallbackToSimpleAuth) throws IOException {
final Call call = createCall(rpcKind, rpcRequest);
@ -1620,12 +1523,6 @@ String getSaslQop() {
return saslQop;
}
static ConnectionId getConnectionId(InetSocketAddress addr,
Class<?> protocol, UserGroupInformation ticket, int rpcTimeout,
Configuration conf) throws IOException {
return getConnectionId(addr, protocol, ticket, rpcTimeout, null, conf);
}
/**
* Returns a ConnectionId object.
* @param addr Remote address for the connection.

View File

@ -221,7 +221,7 @@ public Invoker(Class<?> protocol,
int rpcTimeout, AtomicBoolean fallbackToSimpleAuth)
throws IOException {
this.remoteId = Client.ConnectionId.getConnectionId(address, protocol,
ticket, rpcTimeout, conf);
ticket, rpcTimeout, null, conf);
this.client = CLIENTS.getClient(conf, factory);
this.fallbackToSimpleAuth = fallbackToSimpleAuth;
}

View File

@ -55,7 +55,6 @@
import javax.net.SocketFactory;
import com.google.common.base.Supplier;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
@ -63,7 +62,6 @@
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.retry.DefaultFailoverProxyProvider;
@ -78,9 +76,9 @@
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
import org.apache.hadoop.net.ConnectTimeoutException;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.log4j.Level;
import org.junit.Assert;
import org.junit.Assume;
@ -91,6 +89,7 @@
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import com.google.common.base.Supplier;
import com.google.common.primitives.Bytes;
import com.google.common.primitives.Ints;
@ -122,6 +121,33 @@ public void setupConf() {
/** Directory where we can count open file descriptors on Linux */
private static final File FD_DIR = new File("/proc/self/fd");
static ConnectionId getConnectionId(InetSocketAddress addr, int rpcTimeout,
Configuration conf) throws IOException {
return ConnectionId.getConnectionId(addr, null, null, rpcTimeout, null,
conf);
}
static Writable call(Client client, InetSocketAddress addr,
int serviceClass, Configuration conf) throws IOException {
final LongWritable param = new LongWritable(RANDOM.nextLong());
final ConnectionId remoteId = getConnectionId(addr, MIN_SLEEP_TIME, conf);
return client.call(RPC.RpcKind.RPC_BUILTIN, param, remoteId, serviceClass,
null);
}
static LongWritable call(Client client, long param, InetSocketAddress addr,
Configuration conf) throws IOException {
return call(client, new LongWritable(param), addr, 0, conf);
}
static LongWritable call(Client client, LongWritable param,
InetSocketAddress addr, int rpcTimeout, Configuration conf)
throws IOException {
final ConnectionId remoteId = getConnectionId(addr, rpcTimeout, conf);
return (LongWritable)client.call(RPC.RpcKind.RPC_BUILTIN, param, remoteId,
RPC.RPC_SERVICE_CLASS_DEFAULT, null);
}
private static class TestServer extends Server {
// Tests can set callListener to run a piece of code each time the server
// receives a call. This code executes on the server thread, so it has
@ -183,10 +209,9 @@ public SerialCaller(Client client, InetSocketAddress server, int count) {
public void run() {
for (int i = 0; i < count; i++) {
try {
LongWritable param = new LongWritable(RANDOM.nextLong());
LongWritable value =
(LongWritable)client.call(param, server, null, null, 0, conf);
if (!param.equals(value)) {
final long param = RANDOM.nextLong();
LongWritable value = call(client, param, server, conf);
if (value.get() != param) {
LOG.fatal("Call failed!");
failed = true;
break;
@ -226,9 +251,8 @@ protected Object returnValue(Object value) throws Exception {
@Override
public Object invoke(Object proxy, Method method, Object[] args)
throws Throwable {
LongWritable param = new LongWritable(RANDOM.nextLong());
LongWritable value = (LongWritable) client.call(param,
NetUtils.getConnectAddress(server), null, null, 0, conf);
LongWritable value = call(client, RANDOM.nextLong(),
NetUtils.getConnectAddress(server), conf);
return returnValue(value);
}
@ -298,8 +322,7 @@ public void testStandAloneClient() throws IOException {
Client client = new Client(LongWritable.class, conf);
InetSocketAddress address = new InetSocketAddress("127.0.0.1", 10);
try {
client.call(new LongWritable(RANDOM.nextLong()),
address, null, null, 0, conf);
call(client, RANDOM.nextLong(), address, conf);
fail("Expected an exception to have been thrown");
} catch (IOException e) {
String message = e.getMessage();
@ -412,7 +435,7 @@ private void doErrorTest(
LongWritable param = clientParamClass.newInstance();
try {
client.call(param, addr, null, null, 0, conf);
call(client, param, addr, 0, conf);
fail("Expected an exception to have been thrown");
} catch (Throwable t) {
assertExceptionContains(t, "Injected fault");
@ -422,7 +445,7 @@ private void doErrorTest(
// ie the internal state of the client or server should not be broken
// by the failed call
WRITABLE_FAULTS_ENABLED = false;
client.call(param, addr, null, null, 0, conf);
call(client, param, addr, 0, conf);
} finally {
client.stop();
@ -536,8 +559,7 @@ public void testSocketFactoryException() throws IOException {
InetSocketAddress address = new InetSocketAddress("127.0.0.1", 10);
try {
client.call(new LongWritable(RANDOM.nextLong()),
address, null, null, 0, conf);
call(client, RANDOM.nextLong(), address, conf);
fail("Expected an exception to have been thrown");
} catch (IOException e) {
assertTrue(e.getMessage().contains("Injected fault"));
@ -574,8 +596,7 @@ public Socket answer(InvocationOnMock invocation) throws Throwable {
// Call should fail due to injected exception.
InetSocketAddress address = NetUtils.getConnectAddress(server);
try {
client.call(new LongWritable(RANDOM.nextLong()),
address, null, null, 0, conf);
call(client, RANDOM.nextLong(), address, conf);
fail("Expected an exception to have been thrown");
} catch (Exception e) {
LOG.info("caught expected exception", e);
@ -586,8 +607,7 @@ public Socket answer(InvocationOnMock invocation) throws Throwable {
// (i.e. it should not have cached a half-constructed connection)
Mockito.reset(spyFactory);
client.call(new LongWritable(RANDOM.nextLong()),
address, null, null, 0, conf);
call(client, RANDOM.nextLong(), address, conf);
} finally {
client.stop();
server.stop();
@ -605,15 +625,15 @@ public void testIpcTimeout() throws IOException {
Client client = new Client(LongWritable.class, conf);
// set timeout to be less than MIN_SLEEP_TIME
try {
client.call(new LongWritable(RANDOM.nextLong()),
addr, null, null, MIN_SLEEP_TIME/2, conf);
call(client, new LongWritable(RANDOM.nextLong()), addr,
MIN_SLEEP_TIME / 2, conf);
fail("Expected an exception to have been thrown");
} catch (SocketTimeoutException e) {
LOG.info("Get a SocketTimeoutException ", e);
}
// set timeout to be bigger than 3*ping interval
client.call(new LongWritable(RANDOM.nextLong()),
addr, null, null, 3*PING_INTERVAL+MIN_SLEEP_TIME, conf);
call(client, new LongWritable(RANDOM.nextLong()), addr,
3 * PING_INTERVAL + MIN_SLEEP_TIME, conf);
client.stop();
}
@ -629,8 +649,8 @@ public void testIpcConnectTimeout() throws IOException {
Client client = new Client(LongWritable.class, conf);
// set the rpc timeout to twice the MIN_SLEEP_TIME
try {
client.call(new LongWritable(RANDOM.nextLong()),
addr, null, null, MIN_SLEEP_TIME*2, conf);
call(client, new LongWritable(RANDOM.nextLong()), addr,
MIN_SLEEP_TIME * 2, conf);
fail("Expected an exception to have been thrown");
} catch (SocketTimeoutException e) {
LOG.info("Get a SocketTimeoutException ", e);
@ -743,8 +763,8 @@ private void checkBlocking(int readers, int readerQ, int callQ) throws Exception
public void run() {
Client client = new Client(LongWritable.class, conf);
try {
client.call(new LongWritable(Thread.currentThread().getId()),
addr, null, null, 60000, conf);
call(client, new LongWritable(Thread.currentThread().getId()),
addr, 60000, conf);
} catch (Throwable e) {
LOG.error(e);
failures.incrementAndGet();
@ -875,8 +895,7 @@ public void run() {
public void run() {
Client client = new Client(LongWritable.class, clientConf);
try {
client.call(new LongWritable(Thread.currentThread().getId()),
addr, null, null, 0, clientConf);
call(client, Thread.currentThread().getId(), addr, clientConf);
callReturned.countDown();
Thread.sleep(10000);
} catch (IOException e) {
@ -935,12 +954,11 @@ public void run() {
/**
* Make a call from a client and verify if header info is changed in server side
*/
private void callAndVerify(Server server, InetSocketAddress addr,
private static void callAndVerify(Server server, InetSocketAddress addr,
int serviceClass, boolean noChanged) throws IOException{
Client client = new Client(LongWritable.class, conf);
client.call(new LongWritable(RANDOM.nextLong()),
addr, null, null, MIN_SLEEP_TIME, serviceClass, conf);
call(client, addr, serviceClass, conf);
Connection connection = server.getConnections()[0];
int serviceClass2 = connection.getServiceClass();
assertFalse(noChanged ^ serviceClass == serviceClass2);
@ -956,13 +974,11 @@ public void testIpcAfterStopping() throws IOException {
// start client
Client client = new Client(LongWritable.class, conf);
client.call(new LongWritable(RANDOM.nextLong()),
addr, null, null, MIN_SLEEP_TIME, 0, conf);
call(client, addr, 0, conf);
client.stop();
// This call should throw IOException.
client.call(new LongWritable(RANDOM.nextLong()),
addr, null, null, MIN_SLEEP_TIME, 0, conf);
call(client, addr, 0, conf);
}
/**
@ -992,7 +1008,7 @@ public void testSocketLeak() throws IOException {
@Test(timeout=30000)
public void testInterrupted() {
Client client = new Client(LongWritable.class, conf);
client.getClientExecutor().submit(new Runnable() {
Client.getClientExecutor().submit(new Runnable() {
public void run() {
while(true);
}
@ -1007,7 +1023,7 @@ public void run() {
Assert.fail("The Client did not interrupt after handling an Interrupted Exception");
}
// Clear Thread interrupt
Thread.currentThread().interrupted();
Thread.interrupted();
}
private long countOpenFileDescriptors() {
@ -1363,11 +1379,10 @@ private void assertRetriesOnSocketTimeouts(Configuration conf,
int maxTimeoutRetries) throws IOException {
SocketFactory mockFactory = Mockito.mock(SocketFactory.class);
doThrow(new ConnectTimeoutException("fake")).when(mockFactory).createSocket();
Client client = new Client(IntWritable.class, conf, mockFactory);
Client client = new Client(LongWritable.class, conf, mockFactory);
InetSocketAddress address = new InetSocketAddress("127.0.0.1", 9090);
try {
client.call(new IntWritable(RANDOM.nextInt()), address, null, null, 0,
conf);
call(client, RANDOM.nextLong(), address, conf);
fail("Not throwing the SocketTimeoutException");
} catch (SocketTimeoutException e) {
Mockito.verify(mockFactory, Mockito.times(maxTimeoutRetries))

View File

@ -39,6 +39,8 @@
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.ipc.Client.ConnectionId;
import org.apache.hadoop.ipc.RPC.RpcKind;
import org.apache.hadoop.ipc.Server.Call;
import org.apache.hadoop.net.NetUtils;
import org.junit.Assert;
@ -66,6 +68,14 @@ public class TestIPCServerResponder {
BYTES[i] = (byte) ('a' + (i % 26));
}
static Writable call(Client client, Writable param,
InetSocketAddress address) throws IOException {
final ConnectionId remoteId = ConnectionId.getConnectionId(address, null,
null, 0, null, conf);
return client.call(RpcKind.RPC_BUILTIN, param, remoteId,
RPC.RPC_SERVICE_CLASS_DEFAULT, null);
}
private static class TestServer extends Server {
private boolean sleep;
@ -113,7 +123,7 @@ public void run() {
byte[] bytes = new byte[byteSize];
System.arraycopy(BYTES, 0, bytes, 0, byteSize);
Writable param = new BytesWritable(bytes);
client.call(param, address);
call(client, param, address);
Thread.sleep(RANDOM.nextInt(20));
} catch (Exception e) {
LOG.fatal("Caught Exception", e);
@ -209,17 +219,16 @@ public Writable call(RPC.RpcKind rpcKind, String protocol,
// calls should return immediately, check the sequence number is
// increasing
assertEquals(0,
((IntWritable)client.call(wait0, address)).get());
assertEquals(1,
((IntWritable)client.call(wait0, address)).get());
assertEquals(0, ((IntWritable)call(client, wait0, address)).get());
assertEquals(1, ((IntWritable)call(client, wait0, address)).get());
// do a call in the background that will have a deferred response
final ExecutorService exec = Executors.newCachedThreadPool();
Future<Integer> future1 = exec.submit(new Callable<Integer>() {
@Override
public Integer call() throws IOException {
return ((IntWritable)client.call(wait1, address)).get();
return ((IntWritable)TestIPCServerResponder.call(
client, wait1, address)).get();
}
});
// make sure it blocked
@ -237,14 +246,14 @@ public Integer call() throws IOException {
// proves the handler isn't tied up, and that the prior sequence number
// was consumed
assertEquals(3,
((IntWritable)client.call(wait0, address)).get());
assertEquals(3, ((IntWritable)call(client, wait0, address)).get());
// another call with wait count of 2
Future<Integer> future2 = exec.submit(new Callable<Integer>() {
@Override
public Integer call() throws IOException {
return ((IntWritable)client.call(wait2, address)).get();
return ((IntWritable)TestIPCServerResponder.call(
client, wait2, address)).get();
}
});
// make sure it blocked
@ -286,8 +295,7 @@ public Integer call() throws IOException {
assertFalse(future2.isDone());
// call should return immediately
assertEquals(5,
((IntWritable)client.call(wait0, address)).get());
assertEquals(5, ((IntWritable)call(client, wait0, address)).get());
// trigger last waiting call
waitingCalls[1].sendResponse();

View File

@ -18,8 +18,8 @@
package org.apache.hadoop.ipc;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_RPC_PROTECTION;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION;
import static org.apache.hadoop.security.SaslRpcServer.AuthMethod.KERBEROS;
import static org.apache.hadoop.security.SaslRpcServer.AuthMethod.SIMPLE;
import static org.apache.hadoop.security.SaslRpcServer.AuthMethod.TOKEN;
@ -42,16 +42,13 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
@ -455,6 +452,11 @@ private void doDigestRpc(Server server, TestTokenSecretManager sm
}
}
static ConnectionId getConnectionId(Configuration conf) throws IOException {
return ConnectionId.getConnectionId(new InetSocketAddress(0),
TestSaslProtocol.class, null, 0, null, conf);
}
@Test
public void testPingInterval() throws Exception {
Configuration newConf = new Configuration(conf);
@ -464,14 +466,12 @@ public void testPingInterval() throws Exception {
// set doPing to true
newConf.setBoolean(CommonConfigurationKeys.IPC_CLIENT_PING_KEY, true);
ConnectionId remoteId = ConnectionId.getConnectionId(
new InetSocketAddress(0), TestSaslProtocol.class, null, 0, newConf);
ConnectionId remoteId = getConnectionId(newConf);
assertEquals(CommonConfigurationKeys.IPC_PING_INTERVAL_DEFAULT,
remoteId.getPingInterval());
// set doPing to false
newConf.setBoolean(CommonConfigurationKeys.IPC_CLIENT_PING_KEY, false);
remoteId = ConnectionId.getConnectionId(
new InetSocketAddress(0), TestSaslProtocol.class, null, 0, newConf);
remoteId = getConnectionId(newConf);
assertEquals(0, remoteId.getPingInterval());
}