From 01c0448ccd943186ba8045074a59e53f8f08c364 Mon Sep 17 00:00:00 2001 From: chenheng Date: Sat, 30 Apr 2016 09:27:32 +0800 Subject: [PATCH] HBASE-15278 AsyncRPCClient hangs if Connection closes before RPC call response --- .../hadoop/hbase/ipc/AsyncRpcChannel.java | 6 ++ .../hbase/ipc/AsyncServerResponseHandler.java | 8 +-- .../hadoop/hbase/ipc/AbstractTestIPC.java | 69 ++++++++++++++++++- 3 files changed, 74 insertions(+), 9 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java index 53eb824f93b..ef3240cfe55 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java @@ -210,6 +210,12 @@ public class AsyncRpcChannel { ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)); ch.pipeline().addLast(new AsyncServerResponseHandler(this)); + ch.closeFuture().addListener(new GenericFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + close(null); + } + }); try { writeChannelHeader(ch).addListener(new GenericFutureListener() { @Override diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java index e0c7586477c..5c604a4f21c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java @@ -17,8 +17,6 @@ */ package org.apache.hadoop.hbase.ipc; -import java.io.IOException; - import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; @@ -105,11 +103,6 @@ public class AsyncServerResponseHandler extends SimpleChannelInboundHandlere @@ -123,4 +116,5 @@ public class AsyncServerResponseHandler extends SimpleChannelInboundHandler of(CELL))), md, param, + md.getOutputType().toProto(), User.getCurrent(), address, + new MetricsConnection.CallStats()); + fail("RPC should have failed because server closed connection"); + } catch(IOException ex) { + // pass + } + } finally { + rpcServer.stop(); + } + } + /** * Instance of RpcServer that echoes client hostAddress back to client */