HADOOP-12923. Move the test code in ipc.Client to test.
This commit is contained in:
parent
9a43094e12
commit
1898810cda
|
@ -1120,12 +1120,11 @@ public class Client {
|
||||||
if (LOG.isDebugEnabled())
|
if (LOG.isDebugEnabled())
|
||||||
LOG.debug(getName() + " got value #" + callId);
|
LOG.debug(getName() + " got value #" + callId);
|
||||||
|
|
||||||
Call call = calls.get(callId);
|
|
||||||
RpcStatusProto status = header.getStatus();
|
RpcStatusProto status = header.getStatus();
|
||||||
if (status == RpcStatusProto.SUCCESS) {
|
if (status == RpcStatusProto.SUCCESS) {
|
||||||
Writable value = ReflectionUtils.newInstance(valueClass, conf);
|
Writable value = ReflectionUtils.newInstance(valueClass, conf);
|
||||||
value.readFields(in); // read value
|
value.readFields(in); // read value
|
||||||
calls.remove(callId);
|
final Call call = calls.remove(callId);
|
||||||
call.setRpcResponse(value);
|
call.setRpcResponse(value);
|
||||||
|
|
||||||
// verify that length was correct
|
// verify that length was correct
|
||||||
|
@ -1157,7 +1156,7 @@ public class Client {
|
||||||
}
|
}
|
||||||
RemoteException re = new RemoteException(exceptionClassName, errorMsg, erCode);
|
RemoteException re = new RemoteException(exceptionClassName, errorMsg, erCode);
|
||||||
if (status == RpcStatusProto.ERROR) {
|
if (status == RpcStatusProto.ERROR) {
|
||||||
calls.remove(callId);
|
final Call call = calls.remove(callId);
|
||||||
call.setException(re);
|
call.setException(re);
|
||||||
} else if (status == RpcStatusProto.FATAL) {
|
} else if (status == RpcStatusProto.FATAL) {
|
||||||
// Close the connection
|
// Close the connection
|
||||||
|
@ -1288,85 +1287,6 @@ public class Client {
|
||||||
clientExcecutorFactory.unrefAndCleanup();
|
clientExcecutorFactory.unrefAndCleanup();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Same as {@link #call(RPC.RpcKind, Writable, ConnectionId)}
|
|
||||||
* for RPC_BUILTIN
|
|
||||||
*/
|
|
||||||
public Writable call(Writable param, InetSocketAddress address)
|
|
||||||
throws IOException {
|
|
||||||
ConnectionId remoteId = ConnectionId.getConnectionId(address, null, null, 0,
|
|
||||||
conf);
|
|
||||||
return call(RpcKind.RPC_BUILTIN, param, remoteId);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Same as {@link #call(RPC.RpcKind, Writable, InetSocketAddress,
|
|
||||||
* Class, UserGroupInformation, int, Configuration)}
|
|
||||||
* except that rpcKind is writable.
|
|
||||||
*/
|
|
||||||
public Writable call(Writable param, InetSocketAddress addr,
|
|
||||||
Class<?> protocol, UserGroupInformation ticket,
|
|
||||||
int rpcTimeout, Configuration conf) throws IOException {
|
|
||||||
ConnectionId remoteId = ConnectionId.getConnectionId(addr, protocol,
|
|
||||||
ticket, rpcTimeout, conf);
|
|
||||||
return call(RPC.RpcKind.RPC_BUILTIN, param, remoteId);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Same as {@link #call(Writable, InetSocketAddress,
|
|
||||||
* Class, UserGroupInformation, int, Configuration)}
|
|
||||||
* except that specifying serviceClass.
|
|
||||||
*/
|
|
||||||
public Writable call(Writable param, InetSocketAddress addr,
|
|
||||||
Class<?> protocol, UserGroupInformation ticket,
|
|
||||||
int rpcTimeout, int serviceClass, Configuration conf)
|
|
||||||
throws IOException {
|
|
||||||
ConnectionId remoteId = ConnectionId.getConnectionId(addr, protocol,
|
|
||||||
ticket, rpcTimeout, conf);
|
|
||||||
return call(RPC.RpcKind.RPC_BUILTIN, param, remoteId, serviceClass);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Make a call, passing <code>param</code>, to the IPC server running at
|
|
||||||
* <code>address</code> which is servicing the <code>protocol</code> protocol,
|
|
||||||
* with the <code>ticket</code> credentials, <code>rpcTimeout</code> as
|
|
||||||
* timeout and <code>conf</code> as conf for this connection, returning the
|
|
||||||
* value. Throws exceptions if there are network problems or if the remote
|
|
||||||
* code threw an exception.
|
|
||||||
*/
|
|
||||||
public Writable call(RPC.RpcKind rpcKind, Writable param, InetSocketAddress addr,
|
|
||||||
Class<?> protocol, UserGroupInformation ticket,
|
|
||||||
int rpcTimeout, Configuration conf) throws IOException {
|
|
||||||
ConnectionId remoteId = ConnectionId.getConnectionId(addr, protocol,
|
|
||||||
ticket, rpcTimeout, conf);
|
|
||||||
return call(rpcKind, param, remoteId);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Same as {link {@link #call(RPC.RpcKind, Writable, ConnectionId)}
|
|
||||||
* except the rpcKind is RPC_BUILTIN
|
|
||||||
*/
|
|
||||||
public Writable call(Writable param, ConnectionId remoteId)
|
|
||||||
throws IOException {
|
|
||||||
return call(RPC.RpcKind.RPC_BUILTIN, param, remoteId);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Make a call, passing <code>rpcRequest</code>, to the IPC server defined by
|
|
||||||
* <code>remoteId</code>, returning the rpc respond.
|
|
||||||
*
|
|
||||||
* @param rpcKind
|
|
||||||
* @param rpcRequest - contains serialized method and method parameters
|
|
||||||
* @param remoteId - the target rpc server
|
|
||||||
* @returns the rpc response
|
|
||||||
* Throws exceptions if there are network problems or if the remote code
|
|
||||||
* threw an exception.
|
|
||||||
*/
|
|
||||||
public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
|
|
||||||
ConnectionId remoteId) throws IOException {
|
|
||||||
return call(rpcKind, rpcRequest, remoteId, RPC.RPC_SERVICE_CLASS_DEFAULT);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Make a call, passing <code>rpcRequest</code>, to the IPC server defined by
|
* Make a call, passing <code>rpcRequest</code>, to the IPC server defined by
|
||||||
* <code>remoteId</code>, returning the rpc respond.
|
* <code>remoteId</code>, returning the rpc respond.
|
||||||
|
@ -1387,23 +1307,6 @@ public class Client {
|
||||||
fallbackToSimpleAuth);
|
fallbackToSimpleAuth);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Make a call, passing <code>rpcRequest</code>, to the IPC server defined by
|
|
||||||
* <code>remoteId</code>, returning the rpc response.
|
|
||||||
*
|
|
||||||
* @param rpcKind
|
|
||||||
* @param rpcRequest - contains serialized method and method parameters
|
|
||||||
* @param remoteId - the target rpc server
|
|
||||||
* @param serviceClass - service class for RPC
|
|
||||||
* @returns the rpc response
|
|
||||||
* Throws exceptions if there are network problems or if the remote code
|
|
||||||
* threw an exception.
|
|
||||||
*/
|
|
||||||
public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
|
|
||||||
ConnectionId remoteId, int serviceClass) throws IOException {
|
|
||||||
return call(rpcKind, rpcRequest, remoteId, serviceClass, null);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Make a call, passing <code>rpcRequest</code>, to the IPC server defined by
|
* Make a call, passing <code>rpcRequest</code>, to the IPC server defined by
|
||||||
* <code>remoteId</code>, returning the rpc response.
|
* <code>remoteId</code>, returning the rpc response.
|
||||||
|
@ -1418,7 +1321,7 @@ public class Client {
|
||||||
* Throws exceptions if there are network problems or if the remote code
|
* Throws exceptions if there are network problems or if the remote code
|
||||||
* threw an exception.
|
* threw an exception.
|
||||||
*/
|
*/
|
||||||
public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
|
Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
|
||||||
ConnectionId remoteId, int serviceClass,
|
ConnectionId remoteId, int serviceClass,
|
||||||
AtomicBoolean fallbackToSimpleAuth) throws IOException {
|
AtomicBoolean fallbackToSimpleAuth) throws IOException {
|
||||||
final Call call = createCall(rpcKind, rpcRequest);
|
final Call call = createCall(rpcKind, rpcRequest);
|
||||||
|
@ -1620,12 +1523,6 @@ public class Client {
|
||||||
return saslQop;
|
return saslQop;
|
||||||
}
|
}
|
||||||
|
|
||||||
static ConnectionId getConnectionId(InetSocketAddress addr,
|
|
||||||
Class<?> protocol, UserGroupInformation ticket, int rpcTimeout,
|
|
||||||
Configuration conf) throws IOException {
|
|
||||||
return getConnectionId(addr, protocol, ticket, rpcTimeout, null, conf);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns a ConnectionId object.
|
* Returns a ConnectionId object.
|
||||||
* @param addr Remote address for the connection.
|
* @param addr Remote address for the connection.
|
||||||
|
|
|
@ -221,7 +221,7 @@ public class WritableRpcEngine implements RpcEngine {
|
||||||
int rpcTimeout, AtomicBoolean fallbackToSimpleAuth)
|
int rpcTimeout, AtomicBoolean fallbackToSimpleAuth)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
this.remoteId = Client.ConnectionId.getConnectionId(address, protocol,
|
this.remoteId = Client.ConnectionId.getConnectionId(address, protocol,
|
||||||
ticket, rpcTimeout, conf);
|
ticket, rpcTimeout, null, conf);
|
||||||
this.client = CLIENTS.getClient(conf, factory);
|
this.client = CLIENTS.getClient(conf, factory);
|
||||||
this.fallbackToSimpleAuth = fallbackToSimpleAuth;
|
this.fallbackToSimpleAuth = fallbackToSimpleAuth;
|
||||||
}
|
}
|
||||||
|
|
|
@ -55,7 +55,6 @@ import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
import javax.net.SocketFactory;
|
import javax.net.SocketFactory;
|
||||||
|
|
||||||
import com.google.common.base.Supplier;
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.commons.logging.impl.Log4JLogger;
|
import org.apache.commons.logging.impl.Log4JLogger;
|
||||||
|
@ -63,7 +62,6 @@ import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
import org.apache.hadoop.io.IntWritable;
|
|
||||||
import org.apache.hadoop.io.LongWritable;
|
import org.apache.hadoop.io.LongWritable;
|
||||||
import org.apache.hadoop.io.Writable;
|
import org.apache.hadoop.io.Writable;
|
||||||
import org.apache.hadoop.io.retry.DefaultFailoverProxyProvider;
|
import org.apache.hadoop.io.retry.DefaultFailoverProxyProvider;
|
||||||
|
@ -78,9 +76,9 @@ import org.apache.hadoop.ipc.Server.Connection;
|
||||||
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
|
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
|
||||||
import org.apache.hadoop.net.ConnectTimeoutException;
|
import org.apache.hadoop.net.ConnectTimeoutException;
|
||||||
import org.apache.hadoop.net.NetUtils;
|
import org.apache.hadoop.net.NetUtils;
|
||||||
|
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
|
||||||
import org.apache.hadoop.test.GenericTestUtils;
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
|
|
||||||
import org.apache.log4j.Level;
|
import org.apache.log4j.Level;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Assume;
|
import org.junit.Assume;
|
||||||
|
@ -91,6 +89,7 @@ import org.mockito.internal.util.reflection.Whitebox;
|
||||||
import org.mockito.invocation.InvocationOnMock;
|
import org.mockito.invocation.InvocationOnMock;
|
||||||
import org.mockito.stubbing.Answer;
|
import org.mockito.stubbing.Answer;
|
||||||
|
|
||||||
|
import com.google.common.base.Supplier;
|
||||||
import com.google.common.primitives.Bytes;
|
import com.google.common.primitives.Bytes;
|
||||||
import com.google.common.primitives.Ints;
|
import com.google.common.primitives.Ints;
|
||||||
|
|
||||||
|
@ -122,6 +121,33 @@ public class TestIPC {
|
||||||
/** Directory where we can count open file descriptors on Linux */
|
/** Directory where we can count open file descriptors on Linux */
|
||||||
private static final File FD_DIR = new File("/proc/self/fd");
|
private static final File FD_DIR = new File("/proc/self/fd");
|
||||||
|
|
||||||
|
static ConnectionId getConnectionId(InetSocketAddress addr, int rpcTimeout,
|
||||||
|
Configuration conf) throws IOException {
|
||||||
|
return ConnectionId.getConnectionId(addr, null, null, rpcTimeout, null,
|
||||||
|
conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
static Writable call(Client client, InetSocketAddress addr,
|
||||||
|
int serviceClass, Configuration conf) throws IOException {
|
||||||
|
final LongWritable param = new LongWritable(RANDOM.nextLong());
|
||||||
|
final ConnectionId remoteId = getConnectionId(addr, MIN_SLEEP_TIME, conf);
|
||||||
|
return client.call(RPC.RpcKind.RPC_BUILTIN, param, remoteId, serviceClass,
|
||||||
|
null);
|
||||||
|
}
|
||||||
|
|
||||||
|
static LongWritable call(Client client, long param, InetSocketAddress addr,
|
||||||
|
Configuration conf) throws IOException {
|
||||||
|
return call(client, new LongWritable(param), addr, 0, conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
static LongWritable call(Client client, LongWritable param,
|
||||||
|
InetSocketAddress addr, int rpcTimeout, Configuration conf)
|
||||||
|
throws IOException {
|
||||||
|
final ConnectionId remoteId = getConnectionId(addr, rpcTimeout, conf);
|
||||||
|
return (LongWritable)client.call(RPC.RpcKind.RPC_BUILTIN, param, remoteId,
|
||||||
|
RPC.RPC_SERVICE_CLASS_DEFAULT, null);
|
||||||
|
}
|
||||||
|
|
||||||
private static class TestServer extends Server {
|
private static class TestServer extends Server {
|
||||||
// Tests can set callListener to run a piece of code each time the server
|
// Tests can set callListener to run a piece of code each time the server
|
||||||
// receives a call. This code executes on the server thread, so it has
|
// receives a call. This code executes on the server thread, so it has
|
||||||
|
@ -183,10 +209,9 @@ public class TestIPC {
|
||||||
public void run() {
|
public void run() {
|
||||||
for (int i = 0; i < count; i++) {
|
for (int i = 0; i < count; i++) {
|
||||||
try {
|
try {
|
||||||
LongWritable param = new LongWritable(RANDOM.nextLong());
|
final long param = RANDOM.nextLong();
|
||||||
LongWritable value =
|
LongWritable value = call(client, param, server, conf);
|
||||||
(LongWritable)client.call(param, server, null, null, 0, conf);
|
if (value.get() != param) {
|
||||||
if (!param.equals(value)) {
|
|
||||||
LOG.fatal("Call failed!");
|
LOG.fatal("Call failed!");
|
||||||
failed = true;
|
failed = true;
|
||||||
break;
|
break;
|
||||||
|
@ -226,9 +251,8 @@ public class TestIPC {
|
||||||
@Override
|
@Override
|
||||||
public Object invoke(Object proxy, Method method, Object[] args)
|
public Object invoke(Object proxy, Method method, Object[] args)
|
||||||
throws Throwable {
|
throws Throwable {
|
||||||
LongWritable param = new LongWritable(RANDOM.nextLong());
|
LongWritable value = call(client, RANDOM.nextLong(),
|
||||||
LongWritable value = (LongWritable) client.call(param,
|
NetUtils.getConnectAddress(server), conf);
|
||||||
NetUtils.getConnectAddress(server), null, null, 0, conf);
|
|
||||||
return returnValue(value);
|
return returnValue(value);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -298,8 +322,7 @@ public class TestIPC {
|
||||||
Client client = new Client(LongWritable.class, conf);
|
Client client = new Client(LongWritable.class, conf);
|
||||||
InetSocketAddress address = new InetSocketAddress("127.0.0.1", 10);
|
InetSocketAddress address = new InetSocketAddress("127.0.0.1", 10);
|
||||||
try {
|
try {
|
||||||
client.call(new LongWritable(RANDOM.nextLong()),
|
call(client, RANDOM.nextLong(), address, conf);
|
||||||
address, null, null, 0, conf);
|
|
||||||
fail("Expected an exception to have been thrown");
|
fail("Expected an exception to have been thrown");
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
String message = e.getMessage();
|
String message = e.getMessage();
|
||||||
|
@ -412,7 +435,7 @@ public class TestIPC {
|
||||||
LongWritable param = clientParamClass.newInstance();
|
LongWritable param = clientParamClass.newInstance();
|
||||||
|
|
||||||
try {
|
try {
|
||||||
client.call(param, addr, null, null, 0, conf);
|
call(client, param, addr, 0, conf);
|
||||||
fail("Expected an exception to have been thrown");
|
fail("Expected an exception to have been thrown");
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
assertExceptionContains(t, "Injected fault");
|
assertExceptionContains(t, "Injected fault");
|
||||||
|
@ -422,7 +445,7 @@ public class TestIPC {
|
||||||
// ie the internal state of the client or server should not be broken
|
// ie the internal state of the client or server should not be broken
|
||||||
// by the failed call
|
// by the failed call
|
||||||
WRITABLE_FAULTS_ENABLED = false;
|
WRITABLE_FAULTS_ENABLED = false;
|
||||||
client.call(param, addr, null, null, 0, conf);
|
call(client, param, addr, 0, conf);
|
||||||
|
|
||||||
} finally {
|
} finally {
|
||||||
client.stop();
|
client.stop();
|
||||||
|
@ -536,8 +559,7 @@ public class TestIPC {
|
||||||
|
|
||||||
InetSocketAddress address = new InetSocketAddress("127.0.0.1", 10);
|
InetSocketAddress address = new InetSocketAddress("127.0.0.1", 10);
|
||||||
try {
|
try {
|
||||||
client.call(new LongWritable(RANDOM.nextLong()),
|
call(client, RANDOM.nextLong(), address, conf);
|
||||||
address, null, null, 0, conf);
|
|
||||||
fail("Expected an exception to have been thrown");
|
fail("Expected an exception to have been thrown");
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
assertTrue(e.getMessage().contains("Injected fault"));
|
assertTrue(e.getMessage().contains("Injected fault"));
|
||||||
|
@ -574,8 +596,7 @@ public class TestIPC {
|
||||||
// Call should fail due to injected exception.
|
// Call should fail due to injected exception.
|
||||||
InetSocketAddress address = NetUtils.getConnectAddress(server);
|
InetSocketAddress address = NetUtils.getConnectAddress(server);
|
||||||
try {
|
try {
|
||||||
client.call(new LongWritable(RANDOM.nextLong()),
|
call(client, RANDOM.nextLong(), address, conf);
|
||||||
address, null, null, 0, conf);
|
|
||||||
fail("Expected an exception to have been thrown");
|
fail("Expected an exception to have been thrown");
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOG.info("caught expected exception", e);
|
LOG.info("caught expected exception", e);
|
||||||
|
@ -586,8 +607,7 @@ public class TestIPC {
|
||||||
// (i.e. it should not have cached a half-constructed connection)
|
// (i.e. it should not have cached a half-constructed connection)
|
||||||
|
|
||||||
Mockito.reset(spyFactory);
|
Mockito.reset(spyFactory);
|
||||||
client.call(new LongWritable(RANDOM.nextLong()),
|
call(client, RANDOM.nextLong(), address, conf);
|
||||||
address, null, null, 0, conf);
|
|
||||||
} finally {
|
} finally {
|
||||||
client.stop();
|
client.stop();
|
||||||
server.stop();
|
server.stop();
|
||||||
|
@ -605,15 +625,15 @@ public class TestIPC {
|
||||||
Client client = new Client(LongWritable.class, conf);
|
Client client = new Client(LongWritable.class, conf);
|
||||||
// set timeout to be less than MIN_SLEEP_TIME
|
// set timeout to be less than MIN_SLEEP_TIME
|
||||||
try {
|
try {
|
||||||
client.call(new LongWritable(RANDOM.nextLong()),
|
call(client, new LongWritable(RANDOM.nextLong()), addr,
|
||||||
addr, null, null, MIN_SLEEP_TIME/2, conf);
|
MIN_SLEEP_TIME / 2, conf);
|
||||||
fail("Expected an exception to have been thrown");
|
fail("Expected an exception to have been thrown");
|
||||||
} catch (SocketTimeoutException e) {
|
} catch (SocketTimeoutException e) {
|
||||||
LOG.info("Get a SocketTimeoutException ", e);
|
LOG.info("Get a SocketTimeoutException ", e);
|
||||||
}
|
}
|
||||||
// set timeout to be bigger than 3*ping interval
|
// set timeout to be bigger than 3*ping interval
|
||||||
client.call(new LongWritable(RANDOM.nextLong()),
|
call(client, new LongWritable(RANDOM.nextLong()), addr,
|
||||||
addr, null, null, 3*PING_INTERVAL+MIN_SLEEP_TIME, conf);
|
3 * PING_INTERVAL + MIN_SLEEP_TIME, conf);
|
||||||
client.stop();
|
client.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -629,8 +649,8 @@ public class TestIPC {
|
||||||
Client client = new Client(LongWritable.class, conf);
|
Client client = new Client(LongWritable.class, conf);
|
||||||
// set the rpc timeout to twice the MIN_SLEEP_TIME
|
// set the rpc timeout to twice the MIN_SLEEP_TIME
|
||||||
try {
|
try {
|
||||||
client.call(new LongWritable(RANDOM.nextLong()),
|
call(client, new LongWritable(RANDOM.nextLong()), addr,
|
||||||
addr, null, null, MIN_SLEEP_TIME*2, conf);
|
MIN_SLEEP_TIME * 2, conf);
|
||||||
fail("Expected an exception to have been thrown");
|
fail("Expected an exception to have been thrown");
|
||||||
} catch (SocketTimeoutException e) {
|
} catch (SocketTimeoutException e) {
|
||||||
LOG.info("Get a SocketTimeoutException ", e);
|
LOG.info("Get a SocketTimeoutException ", e);
|
||||||
|
@ -743,8 +763,8 @@ public class TestIPC {
|
||||||
public void run() {
|
public void run() {
|
||||||
Client client = new Client(LongWritable.class, conf);
|
Client client = new Client(LongWritable.class, conf);
|
||||||
try {
|
try {
|
||||||
client.call(new LongWritable(Thread.currentThread().getId()),
|
call(client, new LongWritable(Thread.currentThread().getId()),
|
||||||
addr, null, null, 60000, conf);
|
addr, 60000, conf);
|
||||||
} catch (Throwable e) {
|
} catch (Throwable e) {
|
||||||
LOG.error(e);
|
LOG.error(e);
|
||||||
failures.incrementAndGet();
|
failures.incrementAndGet();
|
||||||
|
@ -875,8 +895,7 @@ public class TestIPC {
|
||||||
public void run() {
|
public void run() {
|
||||||
Client client = new Client(LongWritable.class, clientConf);
|
Client client = new Client(LongWritable.class, clientConf);
|
||||||
try {
|
try {
|
||||||
client.call(new LongWritable(Thread.currentThread().getId()),
|
call(client, Thread.currentThread().getId(), addr, clientConf);
|
||||||
addr, null, null, 0, clientConf);
|
|
||||||
callReturned.countDown();
|
callReturned.countDown();
|
||||||
Thread.sleep(10000);
|
Thread.sleep(10000);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
|
@ -931,16 +950,15 @@ public class TestIPC {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Make a call from a client and verify if header info is changed in server side
|
* Make a call from a client and verify if header info is changed in server side
|
||||||
*/
|
*/
|
||||||
private void callAndVerify(Server server, InetSocketAddress addr,
|
private static void callAndVerify(Server server, InetSocketAddress addr,
|
||||||
int serviceClass, boolean noChanged) throws IOException{
|
int serviceClass, boolean noChanged) throws IOException{
|
||||||
Client client = new Client(LongWritable.class, conf);
|
Client client = new Client(LongWritable.class, conf);
|
||||||
|
|
||||||
client.call(new LongWritable(RANDOM.nextLong()),
|
call(client, addr, serviceClass, conf);
|
||||||
addr, null, null, MIN_SLEEP_TIME, serviceClass, conf);
|
|
||||||
Connection connection = server.getConnections()[0];
|
Connection connection = server.getConnections()[0];
|
||||||
int serviceClass2 = connection.getServiceClass();
|
int serviceClass2 = connection.getServiceClass();
|
||||||
assertFalse(noChanged ^ serviceClass == serviceClass2);
|
assertFalse(noChanged ^ serviceClass == serviceClass2);
|
||||||
|
@ -956,13 +974,11 @@ public class TestIPC {
|
||||||
|
|
||||||
// start client
|
// start client
|
||||||
Client client = new Client(LongWritable.class, conf);
|
Client client = new Client(LongWritable.class, conf);
|
||||||
client.call(new LongWritable(RANDOM.nextLong()),
|
call(client, addr, 0, conf);
|
||||||
addr, null, null, MIN_SLEEP_TIME, 0, conf);
|
|
||||||
client.stop();
|
client.stop();
|
||||||
|
|
||||||
// This call should throw IOException.
|
// This call should throw IOException.
|
||||||
client.call(new LongWritable(RANDOM.nextLong()),
|
call(client, addr, 0, conf);
|
||||||
addr, null, null, MIN_SLEEP_TIME, 0, conf);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -992,7 +1008,7 @@ public class TestIPC {
|
||||||
@Test(timeout=30000)
|
@Test(timeout=30000)
|
||||||
public void testInterrupted() {
|
public void testInterrupted() {
|
||||||
Client client = new Client(LongWritable.class, conf);
|
Client client = new Client(LongWritable.class, conf);
|
||||||
client.getClientExecutor().submit(new Runnable() {
|
Client.getClientExecutor().submit(new Runnable() {
|
||||||
public void run() {
|
public void run() {
|
||||||
while(true);
|
while(true);
|
||||||
}
|
}
|
||||||
|
@ -1007,7 +1023,7 @@ public class TestIPC {
|
||||||
Assert.fail("The Client did not interrupt after handling an Interrupted Exception");
|
Assert.fail("The Client did not interrupt after handling an Interrupted Exception");
|
||||||
}
|
}
|
||||||
// Clear Thread interrupt
|
// Clear Thread interrupt
|
||||||
Thread.currentThread().interrupted();
|
Thread.interrupted();
|
||||||
}
|
}
|
||||||
|
|
||||||
private long countOpenFileDescriptors() {
|
private long countOpenFileDescriptors() {
|
||||||
|
@ -1363,11 +1379,10 @@ public class TestIPC {
|
||||||
int maxTimeoutRetries) throws IOException {
|
int maxTimeoutRetries) throws IOException {
|
||||||
SocketFactory mockFactory = Mockito.mock(SocketFactory.class);
|
SocketFactory mockFactory = Mockito.mock(SocketFactory.class);
|
||||||
doThrow(new ConnectTimeoutException("fake")).when(mockFactory).createSocket();
|
doThrow(new ConnectTimeoutException("fake")).when(mockFactory).createSocket();
|
||||||
Client client = new Client(IntWritable.class, conf, mockFactory);
|
Client client = new Client(LongWritable.class, conf, mockFactory);
|
||||||
InetSocketAddress address = new InetSocketAddress("127.0.0.1", 9090);
|
InetSocketAddress address = new InetSocketAddress("127.0.0.1", 9090);
|
||||||
try {
|
try {
|
||||||
client.call(new IntWritable(RANDOM.nextInt()), address, null, null, 0,
|
call(client, RANDOM.nextLong(), address, conf);
|
||||||
conf);
|
|
||||||
fail("Not throwing the SocketTimeoutException");
|
fail("Not throwing the SocketTimeoutException");
|
||||||
} catch (SocketTimeoutException e) {
|
} catch (SocketTimeoutException e) {
|
||||||
Mockito.verify(mockFactory, Mockito.times(maxTimeoutRetries))
|
Mockito.verify(mockFactory, Mockito.times(maxTimeoutRetries))
|
||||||
|
|
|
@ -39,6 +39,8 @@ import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||||
import org.apache.hadoop.io.BytesWritable;
|
import org.apache.hadoop.io.BytesWritable;
|
||||||
import org.apache.hadoop.io.IntWritable;
|
import org.apache.hadoop.io.IntWritable;
|
||||||
import org.apache.hadoop.io.Writable;
|
import org.apache.hadoop.io.Writable;
|
||||||
|
import org.apache.hadoop.ipc.Client.ConnectionId;
|
||||||
|
import org.apache.hadoop.ipc.RPC.RpcKind;
|
||||||
import org.apache.hadoop.ipc.Server.Call;
|
import org.apache.hadoop.ipc.Server.Call;
|
||||||
import org.apache.hadoop.net.NetUtils;
|
import org.apache.hadoop.net.NetUtils;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
|
@ -66,6 +68,14 @@ public class TestIPCServerResponder {
|
||||||
BYTES[i] = (byte) ('a' + (i % 26));
|
BYTES[i] = (byte) ('a' + (i % 26));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static Writable call(Client client, Writable param,
|
||||||
|
InetSocketAddress address) throws IOException {
|
||||||
|
final ConnectionId remoteId = ConnectionId.getConnectionId(address, null,
|
||||||
|
null, 0, null, conf);
|
||||||
|
return client.call(RpcKind.RPC_BUILTIN, param, remoteId,
|
||||||
|
RPC.RPC_SERVICE_CLASS_DEFAULT, null);
|
||||||
|
}
|
||||||
|
|
||||||
private static class TestServer extends Server {
|
private static class TestServer extends Server {
|
||||||
|
|
||||||
private boolean sleep;
|
private boolean sleep;
|
||||||
|
@ -113,7 +123,7 @@ public class TestIPCServerResponder {
|
||||||
byte[] bytes = new byte[byteSize];
|
byte[] bytes = new byte[byteSize];
|
||||||
System.arraycopy(BYTES, 0, bytes, 0, byteSize);
|
System.arraycopy(BYTES, 0, bytes, 0, byteSize);
|
||||||
Writable param = new BytesWritable(bytes);
|
Writable param = new BytesWritable(bytes);
|
||||||
client.call(param, address);
|
call(client, param, address);
|
||||||
Thread.sleep(RANDOM.nextInt(20));
|
Thread.sleep(RANDOM.nextInt(20));
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOG.fatal("Caught Exception", e);
|
LOG.fatal("Caught Exception", e);
|
||||||
|
@ -209,17 +219,16 @@ public class TestIPCServerResponder {
|
||||||
|
|
||||||
// calls should return immediately, check the sequence number is
|
// calls should return immediately, check the sequence number is
|
||||||
// increasing
|
// increasing
|
||||||
assertEquals(0,
|
assertEquals(0, ((IntWritable)call(client, wait0, address)).get());
|
||||||
((IntWritable)client.call(wait0, address)).get());
|
assertEquals(1, ((IntWritable)call(client, wait0, address)).get());
|
||||||
assertEquals(1,
|
|
||||||
((IntWritable)client.call(wait0, address)).get());
|
|
||||||
|
|
||||||
// do a call in the background that will have a deferred response
|
// do a call in the background that will have a deferred response
|
||||||
final ExecutorService exec = Executors.newCachedThreadPool();
|
final ExecutorService exec = Executors.newCachedThreadPool();
|
||||||
Future<Integer> future1 = exec.submit(new Callable<Integer>() {
|
Future<Integer> future1 = exec.submit(new Callable<Integer>() {
|
||||||
@Override
|
@Override
|
||||||
public Integer call() throws IOException {
|
public Integer call() throws IOException {
|
||||||
return ((IntWritable)client.call(wait1, address)).get();
|
return ((IntWritable)TestIPCServerResponder.call(
|
||||||
|
client, wait1, address)).get();
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
// make sure it blocked
|
// make sure it blocked
|
||||||
|
@ -237,14 +246,14 @@ public class TestIPCServerResponder {
|
||||||
|
|
||||||
// proves the handler isn't tied up, and that the prior sequence number
|
// proves the handler isn't tied up, and that the prior sequence number
|
||||||
// was consumed
|
// was consumed
|
||||||
assertEquals(3,
|
assertEquals(3, ((IntWritable)call(client, wait0, address)).get());
|
||||||
((IntWritable)client.call(wait0, address)).get());
|
|
||||||
|
|
||||||
// another call with wait count of 2
|
// another call with wait count of 2
|
||||||
Future<Integer> future2 = exec.submit(new Callable<Integer>() {
|
Future<Integer> future2 = exec.submit(new Callable<Integer>() {
|
||||||
@Override
|
@Override
|
||||||
public Integer call() throws IOException {
|
public Integer call() throws IOException {
|
||||||
return ((IntWritable)client.call(wait2, address)).get();
|
return ((IntWritable)TestIPCServerResponder.call(
|
||||||
|
client, wait2, address)).get();
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
// make sure it blocked
|
// make sure it blocked
|
||||||
|
@ -286,8 +295,7 @@ public class TestIPCServerResponder {
|
||||||
assertFalse(future2.isDone());
|
assertFalse(future2.isDone());
|
||||||
|
|
||||||
// call should return immediately
|
// call should return immediately
|
||||||
assertEquals(5,
|
assertEquals(5, ((IntWritable)call(client, wait0, address)).get());
|
||||||
((IntWritable)client.call(wait0, address)).get());
|
|
||||||
|
|
||||||
// trigger last waiting call
|
// trigger last waiting call
|
||||||
waitingCalls[1].sendResponse();
|
waitingCalls[1].sendResponse();
|
||||||
|
|
|
@ -18,8 +18,8 @@
|
||||||
|
|
||||||
package org.apache.hadoop.ipc;
|
package org.apache.hadoop.ipc;
|
||||||
|
|
||||||
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION;
|
|
||||||
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_RPC_PROTECTION;
|
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_RPC_PROTECTION;
|
||||||
|
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION;
|
||||||
import static org.apache.hadoop.security.SaslRpcServer.AuthMethod.KERBEROS;
|
import static org.apache.hadoop.security.SaslRpcServer.AuthMethod.KERBEROS;
|
||||||
import static org.apache.hadoop.security.SaslRpcServer.AuthMethod.SIMPLE;
|
import static org.apache.hadoop.security.SaslRpcServer.AuthMethod.SIMPLE;
|
||||||
import static org.apache.hadoop.security.SaslRpcServer.AuthMethod.TOKEN;
|
import static org.apache.hadoop.security.SaslRpcServer.AuthMethod.TOKEN;
|
||||||
|
@ -42,16 +42,13 @@ import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Iterator;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.Callable;
|
import java.util.concurrent.Callable;
|
||||||
import java.util.concurrent.ExecutionException;
|
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
import java.util.concurrent.ThreadLocalRandom;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.TimeoutException;
|
import java.util.concurrent.TimeoutException;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
@ -454,7 +451,12 @@ public class TestSaslRPC {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static ConnectionId getConnectionId(Configuration conf) throws IOException {
|
||||||
|
return ConnectionId.getConnectionId(new InetSocketAddress(0),
|
||||||
|
TestSaslProtocol.class, null, 0, null, conf);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testPingInterval() throws Exception {
|
public void testPingInterval() throws Exception {
|
||||||
Configuration newConf = new Configuration(conf);
|
Configuration newConf = new Configuration(conf);
|
||||||
|
@ -464,14 +466,12 @@ public class TestSaslRPC {
|
||||||
|
|
||||||
// set doPing to true
|
// set doPing to true
|
||||||
newConf.setBoolean(CommonConfigurationKeys.IPC_CLIENT_PING_KEY, true);
|
newConf.setBoolean(CommonConfigurationKeys.IPC_CLIENT_PING_KEY, true);
|
||||||
ConnectionId remoteId = ConnectionId.getConnectionId(
|
ConnectionId remoteId = getConnectionId(newConf);
|
||||||
new InetSocketAddress(0), TestSaslProtocol.class, null, 0, newConf);
|
|
||||||
assertEquals(CommonConfigurationKeys.IPC_PING_INTERVAL_DEFAULT,
|
assertEquals(CommonConfigurationKeys.IPC_PING_INTERVAL_DEFAULT,
|
||||||
remoteId.getPingInterval());
|
remoteId.getPingInterval());
|
||||||
// set doPing to false
|
// set doPing to false
|
||||||
newConf.setBoolean(CommonConfigurationKeys.IPC_CLIENT_PING_KEY, false);
|
newConf.setBoolean(CommonConfigurationKeys.IPC_CLIENT_PING_KEY, false);
|
||||||
remoteId = ConnectionId.getConnectionId(
|
remoteId = getConnectionId(newConf);
|
||||||
new InetSocketAddress(0), TestSaslProtocol.class, null, 0, newConf);
|
|
||||||
assertEquals(0, remoteId.getPingInterval());
|
assertEquals(0, remoteId.getPingInterval());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue