HADOOP-6889. Make RPC to have an option to timeout. Contributed by Hairong Kuang.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@982681 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hairong Kuang 2010-08-05 16:39:10 +00:00
parent c47d34a866
commit 714e5f7165
7 changed files with 148 additions and 52 deletions

View File

@ -33,6 +33,8 @@ Trunk (unreleased changes)
HADOOP-6892. Common component of HDFS-1150 (Verify datanodes' identities
to clients in secure clusters) (jghoman)
HADOOP-6889. Make RPC to have an option to timeout. (hairong)
IMPROVEMENTS
HADOOP-6644. util.Shell getGROUPS_FOR_USER_COMMAND method name
@ -105,6 +107,7 @@ Trunk (unreleased changes)
periodically. (Owen O'Malley and ddas via ddas)
HADOOP-6890. Improve listFiles API introduced by HADOOP-6870. (hairong)
OPTIMIZATIONS
BUG FIXES

View File

@ -98,11 +98,13 @@ private static class ClientTransceiver extends Transceiver {
public ClientTransceiver(InetSocketAddress addr,
UserGroupInformation ticket,
Configuration conf, SocketFactory factory)
Configuration conf, SocketFactory factory,
int rpcTimeout)
throws IOException {
this.tunnel =
(TunnelProtocol)ENGINE.getProxy(TunnelProtocol.class, VERSION,
addr, ticket, conf, factory);
addr, ticket, conf, factory,
rpcTimeout);
this.remote = addr;
}
@ -128,14 +130,15 @@ public void close() throws IOException {
/** Construct a client-side proxy object that implements the named protocol,
* talking to a server at the named address. */
public Object getProxy(Class protocol, long clientVersion,
public Object getProxy(Class<?> protocol, long clientVersion,
InetSocketAddress addr, UserGroupInformation ticket,
Configuration conf, SocketFactory factory)
Configuration conf, SocketFactory factory,
int rpcTimeout)
throws IOException {
return Proxy.newProxyInstance
(protocol.getClassLoader(),
new Class[] { protocol },
new Invoker(protocol, addr, ticket, conf, factory));
new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout));
}
/** Stop this proxy. */
@ -152,8 +155,9 @@ private static class Invoker implements InvocationHandler, Closeable {
private final ReflectRequestor requestor;
public Invoker(Class<?> protocol, InetSocketAddress addr,
UserGroupInformation ticket, Configuration conf,
SocketFactory factory) throws IOException {
this.tx = new ClientTransceiver(addr, ticket, conf, factory);
SocketFactory factory,
int rpcTimeout) throws IOException {
this.tx = new ClientTransceiver(addr, ticket, conf, factory, rpcTimeout);
this.requestor = new ReflectRequestor(protocol, tx);
}
@Override public Object invoke(Object proxy, Method method, Object[] args)
@ -169,7 +173,7 @@ public void close() throws IOException {
private static class TunnelResponder extends ReflectResponder
implements TunnelProtocol {
public TunnelResponder(Class iface, Object impl) {
public TunnelResponder(Class<?> iface, Object impl) {
super(iface, impl);
}
@ -192,7 +196,7 @@ public Object[] call(Method method, Object[][] params,
/** Construct a server for a protocol implementation instance listening on a
* port and address. */
public RPC.Server getServer(Class iface, Object impl, String bindAddress,
public RPC.Server getServer(Class<?> iface, Object impl, String bindAddress,
int port, int numHandlers, boolean verbose,
Configuration conf,
SecretManager<? extends TokenIdentifier> secretManager

View File

@ -219,6 +219,7 @@ private class Connection extends Thread {
private Socket socket = null; // connected socket
private DataInputStream in;
private DataOutputStream out;
private int rpcTimeout;
// currently active calls
private Hashtable<Integer, Call> calls = new Hashtable<Integer, Call>();
@ -233,7 +234,7 @@ public Connection(ConnectionId remoteId) throws IOException {
throw new UnknownHostException("unknown host: " +
remoteId.getAddress().getHostName());
}
this.rpcTimeout = remoteId.getRpcTimeout();
UserGroupInformation ticket = remoteId.getTicket();
Class<?> protocol = remoteId.getProtocol();
this.useSasl = UserGroupInformation.isSecurityEnabled();
@ -321,11 +322,13 @@ protected PingInputStream(InputStream in) {
}
/* Process timeout exception
* if the connection is not going to be closed, send a ping.
* if the connection is not going to be closed or
* is not configured to have a RPC timeout, send a ping.
* (if rpcTimeout is not set to be 0, then RPC should timeout.
* otherwise, throw the timeout exception.
*/
private void handleTimeout(SocketTimeoutException e) throws IOException {
if (shouldCloseConnection.get() || !running.get()) {
if (shouldCloseConnection.get() || !running.get() || rpcTimeout > 0) {
throw e;
} else {
sendPing();
@ -405,6 +408,9 @@ private synchronized void setupConnection() throws IOException {
this.socket.setTcpNoDelay(tcpNoDelay);
// connection time out is 20s
NetUtils.connect(this.socket, remoteId.getAddress(), 20000);
if (rpcTimeout > 0) {
pingInterval = rpcTimeout; // rpcTimeout overwrites pingInterval
}
this.socket.setSoTimeout(pingInterval);
return;
} catch (SocketTimeoutException toe) {
@ -952,7 +958,7 @@ public Writable call(Writable param, InetSocketAddress address)
public Writable call(Writable param, InetSocketAddress addr,
UserGroupInformation ticket)
throws InterruptedException, IOException {
return call(param, addr, null, ticket);
return call(param, addr, null, ticket, 0);
}
/** Make a call, passing <code>param</code>, to the IPC server running at
@ -961,10 +967,12 @@ public Writable call(Writable param, InetSocketAddress addr,
* Throws exceptions if there are network problems or if the remote code
* threw an exception. */
public Writable call(Writable param, InetSocketAddress addr,
Class<?> protocol, UserGroupInformation ticket)
Class<?> protocol, UserGroupInformation ticket,
int rpcTimeout)
throws InterruptedException, IOException {
Call call = new Call(param);
Connection connection = getConnection(addr, protocol, ticket, call);
Connection connection = getConnection(
addr, protocol, ticket, rpcTimeout, call);
connection.sendParam(call); // send the parameter
boolean interrupted = false;
synchronized (call) {
@ -1054,7 +1062,7 @@ public Writable[] call(Writable[] params, InetSocketAddress[] addresses,
ParallelCall call = new ParallelCall(params[i], results, i);
try {
Connection connection =
getConnection(addresses[i], protocol, ticket, call);
getConnection(addresses[i], protocol, ticket, 0, call);
connection.sendParam(call); // send each parameter
} catch (IOException e) {
// log errors
@ -1078,6 +1086,7 @@ public Writable[] call(Writable[] params, InetSocketAddress[] addresses,
private Connection getConnection(InetSocketAddress addr,
Class<?> protocol,
UserGroupInformation ticket,
int rpcTimeout,
Call call)
throws IOException, InterruptedException {
if (!running.get()) {
@ -1089,7 +1098,8 @@ private Connection getConnection(InetSocketAddress addr,
* connectionsId object and with set() method. We need to manage the
* refs for keys in HashMap properly. For now its ok.
*/
ConnectionId remoteId = new ConnectionId(addr, protocol, ticket);
ConnectionId remoteId = new ConnectionId(
addr, protocol, ticket, rpcTimeout);
do {
synchronized (connections) {
connection = connections.get(remoteId);
@ -1117,12 +1127,14 @@ private static class ConnectionId {
UserGroupInformation ticket;
Class<?> protocol;
private static final int PRIME = 16777619;
private int rpcTimeout;
ConnectionId(InetSocketAddress address, Class<?> protocol,
UserGroupInformation ticket) {
UserGroupInformation ticket, int rpcTimeout) {
this.protocol = protocol;
this.address = address;
this.ticket = ticket;
this.rpcTimeout = rpcTimeout;
}
InetSocketAddress getAddress() {
@ -1137,6 +1149,9 @@ UserGroupInformation getTicket() {
return ticket;
}
private int getRpcTimeout() {
return rpcTimeout;
}
@Override
public boolean equals(Object obj) {
@ -1144,15 +1159,19 @@ public boolean equals(Object obj) {
ConnectionId id = (ConnectionId) obj;
return address.equals(id.address) && protocol == id.protocol &&
((ticket != null && ticket.equals(id.ticket)) ||
(ticket == id.ticket));
(ticket == id.ticket)) && rpcTimeout == id.rpcTimeout;
}
return false;
}
@Override
@Override // simply use the default Object#hashcode() ?
public int hashCode() {
return (address.hashCode() + PRIME * System.identityHashCode(protocol)) ^
(ticket == null ? 0 : ticket.hashCode());
return (address.hashCode() + PRIME * (
PRIME * (
PRIME * System.identityHashCode(protocol) ^
System.identityHashCode(ticket)
) ^ System.identityHashCode(rpcTimeout)
));
}
}
}

View File

@ -156,7 +156,7 @@ public long getServerVersion() {
}
public static Object waitForProxy(
Class protocol,
Class<?> protocol,
long clientVersion,
InetSocketAddress addr,
Configuration conf
@ -170,18 +170,37 @@ public static Object waitForProxy(
* @param clientVersion client version
* @param addr remote address
* @param conf configuration to use
* @param timeout time in milliseconds before giving up
* @param connTimeout time in milliseconds before giving up
* @return the proxy
* @throws IOException if the far end through a RemoteException
*/
public static Object waitForProxy(Class protocol, long clientVersion,
public static Object waitForProxy(Class<?> protocol, long clientVersion,
InetSocketAddress addr, Configuration conf,
long timeout) throws IOException {
long connTimeout) throws IOException {
return waitForProxy(protocol, clientVersion, addr, conf, 0, connTimeout);
}
/**
* Get a proxy connection to a remote server
* @param protocol protocol class
* @param clientVersion client version
* @param addr remote address
* @param conf configuration to use
* @param rpcTimeout timeout for each RPC
* @param timeout time in milliseconds before giving up
* @return the proxy
* @throws IOException if the far end through a RemoteException
*/
public static Object waitForProxy(Class<?> protocol, long clientVersion,
InetSocketAddress addr, Configuration conf,
int rpcTimeout,
long timeout) throws IOException {
long startTime = System.currentTimeMillis();
IOException ioe;
while (true) {
try {
return getProxy(protocol, clientVersion, addr, conf);
return getProxy(protocol, clientVersion, addr,
UserGroupInformation.getCurrentUser(), conf, NetUtils
.getDefaultSocketFactory(conf), rpcTimeout);
} catch(ConnectException se) { // namenode has not been started
LOG.info("Server at " + addr + " not available yet, Zzzzz...");
ioe = se;
@ -208,7 +227,7 @@ public static Object waitForProxy(Class protocol, long clientVersion,
/** Construct a client-side proxy object that implements the named protocol,
* talking to a server at the named address. */
public static Object getProxy(Class protocol, long clientVersion,
public static Object getProxy(Class<?> protocol, long clientVersion,
InetSocketAddress addr, Configuration conf,
SocketFactory factory) throws IOException {
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
@ -217,16 +236,39 @@ public static Object getProxy(Class protocol, long clientVersion,
/** Construct a client-side proxy object that implements the named protocol,
* talking to a server at the named address. */
public static Object getProxy(Class protocol, long clientVersion,
public static Object getProxy(Class<?> protocol, long clientVersion,
InetSocketAddress addr,
UserGroupInformation ticket,
Configuration conf,
SocketFactory factory) throws IOException {
return getProxy(protocol, clientVersion, addr, ticket, conf, factory, 0);
}
/**
* Construct a client-side proxy that implements the named protocol,
* talking to a server at the named address.
*
* @param protocol protocol
* @param clientVersion client's version
* @param addr server address
* @param ticket security ticket
* @param conf configuration
* @param factory socket factory
* @param rpcTimeout max time for each rpc; 0 means no timeout
* @return the proxy
* @throws IOException if any error occurs
*/
public static Object getProxy(Class<?> protocol, long clientVersion,
InetSocketAddress addr,
UserGroupInformation ticket,
Configuration conf,
SocketFactory factory,
int rpcTimeout) throws IOException {
if (UserGroupInformation.isSecurityEnabled()) {
SaslRpcServer.init(conf);
}
return getProtocolEngine(protocol,conf)
.getProxy(protocol, clientVersion, addr, ticket, conf, factory);
return getProtocolEngine(protocol,conf).getProxy(protocol,
clientVersion, addr, ticket, conf, factory, rpcTimeout);
}
/**
@ -239,7 +281,7 @@ public static Object getProxy(Class protocol, long clientVersion,
* @return a proxy instance
* @throws IOException
*/
public static Object getProxy(Class protocol, long clientVersion,
public static Object getProxy(Class<?> protocol, long clientVersion,
InetSocketAddress addr, Configuration conf)
throws IOException {

View File

@ -32,10 +32,10 @@
interface RpcEngine {
/** Construct a client-side proxy object. */
Object getProxy(Class protocol,
Object getProxy(Class<?> protocol,
long clientVersion, InetSocketAddress addr,
UserGroupInformation ticket, Configuration conf,
SocketFactory factory) throws IOException;
SocketFactory factory, int rpcTimeout) throws IOException;
/** Stop this proxy. */
void stopProxy(Object proxy);
@ -46,7 +46,7 @@ Object[] call(Method method, Object[][] params, InetSocketAddress[] addrs,
throws IOException, InterruptedException;
/** Construct a server for a protocol implementation instance. */
RPC.Server getServer(Class protocol, Object instance, String bindAddress,
RPC.Server getServer(Class<?> protocol, Object instance, String bindAddress,
int port, int numHandlers, boolean verbose,
Configuration conf,
SecretManager<? extends TokenIdentifier> secretManager

View File

@ -48,7 +48,7 @@ class WritableRpcEngine implements RpcEngine {
/** A method invocation, including the method name and its parameters.*/
private static class Invocation implements Writable, Configurable {
private String methodName;
private Class[] parameterClasses;
private Class<?>[] parameterClasses;
private Object[] parameters;
private Configuration conf;
@ -64,7 +64,7 @@ public Invocation(Method method, Object[] parameters) {
public String getMethodName() { return methodName; }
/** The parameter classes. */
public Class[] getParameterClasses() { return parameterClasses; }
public Class<?>[] getParameterClasses() { return parameterClasses; }
/** The parameter instances. */
public Object[] getParameters() { return parameters; }
@ -172,18 +172,21 @@ private void stopClient(Client client) {
private static ClientCache CLIENTS=new ClientCache();
private static class Invoker implements InvocationHandler {
private Class protocol;
private Class<?> protocol;
private InetSocketAddress address;
private UserGroupInformation ticket;
private int rpcTimeout;
private Client client;
private boolean isClosed = false;
public Invoker(Class protocol,
public Invoker(Class<?> protocol,
InetSocketAddress address, UserGroupInformation ticket,
Configuration conf, SocketFactory factory) {
Configuration conf, SocketFactory factory,
int rpcTimeout) {
this.protocol = protocol;
this.address = address;
this.ticket = ticket;
this.rpcTimeout = rpcTimeout;
this.client = CLIENTS.getClient(conf, factory);
}
@ -197,7 +200,7 @@ public Object invoke(Object proxy, Method method, Object[] args)
ObjectWritable value = (ObjectWritable)
client.call(new Invocation(method, args), address,
protocol, ticket);
protocol, ticket, rpcTimeout);
if (logDebug) {
long callTime = System.currentTimeMillis() - startTime;
LOG.debug("Call: " + method.getName() + " " + callTime);
@ -216,14 +219,15 @@ synchronized private void close() {
/** Construct a client-side proxy object that implements the named protocol,
* talking to a server at the named address. */
public Object getProxy(Class protocol, long clientVersion,
public Object getProxy(Class<?> protocol, long clientVersion,
InetSocketAddress addr, UserGroupInformation ticket,
Configuration conf, SocketFactory factory)
Configuration conf, SocketFactory factory,
int rpcTimeout)
throws IOException {
Object proxy = Proxy.newProxyInstance
(protocol.getClassLoader(), new Class[] { protocol },
new Invoker(protocol, addr, ticket, conf, factory));
new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout));
if (proxy instanceof VersionedProtocol) {
long serverVersion = ((VersionedProtocol)proxy)
.getProtocolVersion(protocol.getName(), clientVersion);
@ -276,7 +280,7 @@ public Object[] call(Method method, Object[][] params,
/** Construct a server for a protocol implementation instance listening on a
* port and address. */
public Server getServer(Class protocol,
public Server getServer(Class<?> protocol,
Object instance, String bindAddress, int port,
int numHandlers, boolean verbose, Configuration conf,
SecretManager<? extends TokenIdentifier> secretManager)

View File

@ -29,6 +29,7 @@
import java.io.DataInput;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
import javax.net.SocketFactory;
import junit.framework.TestCase;
@ -43,6 +44,7 @@ public class TestIPC extends TestCase {
final private static Configuration conf = new Configuration();
final static private int PING_INTERVAL = 1000;
final static private int MIN_SLEEP_TIME = 1000;
static {
Client.setPingInterval(conf, PING_INTERVAL);
@ -66,8 +68,9 @@ public TestServer(int handlerCount, boolean sleep)
public Writable call(Class<?> protocol, Writable param, long receiveTime)
throws IOException {
if (sleep) {
// sleep a bit
try {
Thread.sleep(RANDOM.nextInt(2*PING_INTERVAL)); // sleep a bit
Thread.sleep(RANDOM.nextInt(PING_INTERVAL) + MIN_SLEEP_TIME);
} catch (InterruptedException e) {}
}
return param; // echo param as result
@ -91,7 +94,7 @@ public void run() {
try {
LongWritable param = new LongWritable(RANDOM.nextLong());
LongWritable value =
(LongWritable)client.call(param, server, null, null);
(LongWritable)client.call(param, server, null, null, 0);
if (!param.equals(value)) {
LOG.fatal("Call failed!");
failed = true;
@ -142,6 +145,7 @@ public void run() {
public void testSerial() throws Exception {
testSerial(3, false, 2, 5, 100);
testSerial(3, true, 2, 5, 10);
}
public void testSerial(int handlerCount, boolean handlerSleep,
@ -219,7 +223,7 @@ public void testStandAloneClient() throws Exception {
InetSocketAddress address = new InetSocketAddress("127.0.0.1", 10);
try {
client.call(new LongWritable(RANDOM.nextLong()),
address, null, null);
address, null, null, 0);
fail("Expected an exception to have been thrown");
} catch (IOException e) {
String message = e.getMessage();
@ -276,7 +280,7 @@ public void testErrorClient() throws Exception {
Client client = new Client(LongErrorWritable.class, conf);
try {
client.call(new LongErrorWritable(RANDOM.nextLong()),
addr, null, null);
addr, null, null, 0);
fail("Expected an exception to have been thrown");
} catch (IOException e) {
// check error
@ -296,7 +300,7 @@ public void testRuntimeExceptionWritable() throws Exception {
Client client = new Client(LongRTEWritable.class, conf);
try {
client.call(new LongRTEWritable(RANDOM.nextLong()),
addr, null, null);
addr, null, null, 0);
fail("Expected an exception to have been thrown");
} catch (IOException e) {
// check error
@ -322,13 +326,33 @@ public void testSocketFactoryException() throws Exception {
InetSocketAddress address = new InetSocketAddress("127.0.0.1", 10);
try {
client.call(new LongWritable(RANDOM.nextLong()),
address, null, null);
address, null, null, 0);
fail("Expected an exception to have been thrown");
} catch (IOException e) {
assertTrue(e.getMessage().contains("Injected fault"));
}
}
public void testIpcTimeout() throws Exception {
// start server
Server server = new TestServer(1, true);
InetSocketAddress addr = NetUtils.getConnectAddress(server);
server.start();
// start client
Client client = new Client(LongWritable.class, conf);
// set timeout to be less than MIN_SLEEP_TIME
try {
client.call(new LongWritable(RANDOM.nextLong()),
addr, null, null, MIN_SLEEP_TIME/2);
fail("Expected an exception to have been thrown");
} catch (SocketTimeoutException e) {
LOG.info("Get a SocketTimeoutException ", e);
}
// set timeout to be bigger than 3*ping interval
client.call(new LongWritable(RANDOM.nextLong()),
addr, null, null, 3*PING_INTERVAL+MIN_SLEEP_TIME);
}
public static void main(String[] args) throws Exception {