HBASE-15278 AsyncRPCClient hangs if Connection closes before RPC call response

This commit is contained in:
chenheng 2016-09-01 10:33:24 +08:00
parent 8479554275
commit 3892550484
2 changed files with 56 additions and 1 deletions

View File

@ -2677,7 +2677,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
}
Connection register(SocketChannel channel) {
Connection connection = new Connection(channel, System.currentTimeMillis());
Connection connection = getConnection(channel, System.currentTimeMillis());
add(connection);
if (LOG.isDebugEnabled()) {
LOG.debug("Server connection from " + connection +

View File

@ -35,6 +35,8 @@ import com.google.protobuf.ServiceException;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.List;
@ -287,4 +289,57 @@ public abstract class AbstractTestIPC {
rpcServer.stop();
}
}
static class TestFailingRpcServer extends TestRpcServer {
TestFailingRpcServer() throws IOException {
this(new FifoRpcScheduler(CONF, 1), CONF);
}
TestFailingRpcServer(Configuration conf) throws IOException {
this(new FifoRpcScheduler(conf, 1), conf);
}
TestFailingRpcServer(RpcScheduler scheduler, Configuration conf) throws IOException {
super(scheduler, conf);
}
class FailingConnection extends Connection {
public FailingConnection(SocketChannel channel, long lastContact) {
super(channel, lastContact);
}
@Override
protected void processRequest(ByteBuffer buf) throws IOException, InterruptedException {
// this will throw exception after the connection header is read, and an RPC is sent
// from client
throw new DoNotRetryIOException("Failing for test");
}
}
@Override
protected Connection getConnection(SocketChannel channel, long time) {
return new FailingConnection(channel, time);
}
}
/** Tests that the connection closing is handled by the client with outstanding RPC calls */
@Test (timeout = 30000)
public void testConnectionCloseWithOutstandingRPCs() throws InterruptedException, IOException {
Configuration conf = new Configuration(CONF);
RpcServer rpcServer = new TestFailingRpcServer(conf);
try (AbstractRpcClient client = createRpcClient(conf)) {
rpcServer.start();
BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
stub.echo(
new PayloadCarryingRpcController(CellUtil.createCellScanner(ImmutableList.<Cell> of(CELL))),
param);
fail("RPC should have failed because connection closed");
} catch (ServiceException e) {
LOG.info("Caught expected exception: " + e.toString());
} finally {
rpcServer.stop();
}
}
}