HBASE-15278 AsyncRPCClient hangs if Connection closes before RPC call response
This commit is contained in:
parent
c236409c3b
commit
01c0448ccd
|
@ -210,6 +210,12 @@ public class AsyncRpcChannel {
|
|||
ch.pipeline().addLast("frameDecoder",
|
||||
new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
|
||||
ch.pipeline().addLast(new AsyncServerResponseHandler(this));
|
||||
ch.closeFuture().addListener(new GenericFutureListener<ChannelFuture>() {
|
||||
@Override
|
||||
public void operationComplete(ChannelFuture future) throws Exception {
|
||||
close(null);
|
||||
}
|
||||
});
|
||||
try {
|
||||
writeChannelHeader(ch).addListener(new GenericFutureListener<ChannelFuture>() {
|
||||
@Override
|
||||
|
|
|
@ -17,8 +17,6 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.ipc;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.hbase.CellScanner;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
||||
|
@ -105,11 +103,6 @@ public class AsyncServerResponseHandler extends SimpleChannelInboundHandler<Byte
|
|||
channel.close(cause);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
|
||||
channel.close(new IOException("connection closed"));
|
||||
}
|
||||
|
||||
/**
|
||||
* @param e Proto exception
|
||||
* @return RemoteException made from passed <code>e</code>
|
||||
|
@ -123,4 +116,5 @@ public class AsyncServerResponseHandler extends SimpleChannelInboundHandler<Byte
|
|||
e.getPort(), doNotRetry)
|
||||
: new RemoteWithExtrasException(innerExceptionClassName, e.getStackTrace(), doNotRetry);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -30,6 +30,8 @@ import java.net.ConnectException;
|
|||
import java.net.InetAddress;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.SocketTimeoutException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.SocketChannel;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
|
@ -39,6 +41,7 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellScanner;
|
||||
import org.apache.hadoop.hbase.CellUtil;
|
||||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
|
@ -158,6 +161,39 @@ public abstract class AbstractTestIPC {
|
|||
}
|
||||
}
|
||||
|
||||
static class TestFailingRpcServer extends TestRpcServer {
|
||||
|
||||
TestFailingRpcServer() throws IOException {
|
||||
this(new FifoRpcScheduler(CONF, 1), CONF);
|
||||
}
|
||||
|
||||
TestFailingRpcServer(Configuration conf) throws IOException {
|
||||
this(new FifoRpcScheduler(conf, 1), conf);
|
||||
}
|
||||
|
||||
TestFailingRpcServer(RpcScheduler scheduler, Configuration conf) throws IOException {
|
||||
super(scheduler, conf);
|
||||
}
|
||||
|
||||
class FailingConnection extends Connection {
|
||||
public FailingConnection(SocketChannel channel, long lastContact) {
|
||||
super(channel, lastContact);
|
||||
}
|
||||
@Override
|
||||
protected void processRequest(ByteBuffer buf) throws IOException, InterruptedException {
|
||||
// this will throw exception after the connection header is read, and an RPC is sent
|
||||
// from client
|
||||
throw new DoNotRetryIOException("Failing for test");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Connection getConnection(SocketChannel channel, long time) {
|
||||
return new FailingConnection(channel, time);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
protected abstract AbstractRpcClient createRpcClientNoCodec(Configuration conf);
|
||||
|
||||
/**
|
||||
|
@ -296,8 +332,8 @@ public abstract class AbstractTestIPC {
|
|||
}
|
||||
}
|
||||
|
||||
/** Tests that the rpc scheduler is called when requests arrive. */
|
||||
@Test
|
||||
/** Tests that RPC max request size is respected from the server side */
|
||||
@Test (timeout = 30000)
|
||||
public void testRpcMaxRequestSize() throws IOException, InterruptedException {
|
||||
Configuration conf = new Configuration(CONF);
|
||||
conf.setInt(RpcServer.MAX_REQUEST_SIZE, 100);
|
||||
|
@ -327,6 +363,35 @@ public abstract class AbstractTestIPC {
|
|||
}
|
||||
}
|
||||
|
||||
/** Tests that the connection closing is handled by the client with outstanding RPC calls */
|
||||
@Test (timeout = 30000)
|
||||
public void testConnectionCloseWithOutstandingRPCs() throws IOException, InterruptedException {
|
||||
Configuration conf = new Configuration(CONF);
|
||||
|
||||
RpcServer rpcServer = new TestFailingRpcServer(conf);
|
||||
AbstractRpcClient client = createRpcClient(conf);
|
||||
try {
|
||||
rpcServer.start();
|
||||
MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
|
||||
EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
|
||||
InetSocketAddress address = rpcServer.getListenerAddress();
|
||||
if (address == null) {
|
||||
throw new IOException("Listener channel is closed");
|
||||
}
|
||||
try {
|
||||
client.call(new PayloadCarryingRpcController(
|
||||
CellUtil.createCellScanner(ImmutableList.<Cell> of(CELL))), md, param,
|
||||
md.getOutputType().toProto(), User.getCurrent(), address,
|
||||
new MetricsConnection.CallStats());
|
||||
fail("RPC should have failed because server closed connection");
|
||||
} catch(IOException ex) {
|
||||
// pass
|
||||
}
|
||||
} finally {
|
||||
rpcServer.stop();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Instance of RpcServer that echoes client hostAddress back to client
|
||||
*/
|
||||
|
|
Loading…
Reference in New Issue