HBASE-15957 RpcClientImpl.close never ends in some circumstances

Signed-off-by: Enis Soztutar <enis@apache.org>
This commit is contained in:
Sergey Soldatov 2016-06-05 23:46:03 -07:00 committed by Enis Soztutar
parent 376ad0d986
commit da88b48240
2 changed files with 31 additions and 9 deletions

View File

@ -1202,11 +1202,10 @@ public class RpcClientImpl extends AbstractRpcClient {
} }
if (connsToClose != null) { if (connsToClose != null) {
for (Connection conn : connsToClose) { for (Connection conn : connsToClose) {
if (conn.markClosed(new InterruptedIOException("RpcClient is closing"))) { conn.markClosed(new InterruptedIOException("RpcClient is closing"));
conn.close(); conn.close();
} }
} }
}
// wait until all connections are closed // wait until all connections are closed
while (!connections.isEmpty()) { while (!connections.isEmpty()) {
try { try {

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.ipc; package org.apache.hadoop.hbase.ipc;
import static org.apache.hadoop.hbase.ipc.RpcClient.SPECIFIC_WRITE_THREAD;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
@ -41,12 +42,6 @@ import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.ipc.AbstractRpcClient;
import org.apache.hadoop.hbase.ipc.AsyncRpcClient;
import org.apache.hadoop.hbase.ipc.FifoRpcScheduler;
import org.apache.hadoop.hbase.ipc.RpcClientImpl;
import org.apache.hadoop.hbase.ipc.RpcScheduler;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos; import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto; import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto; import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto;
@ -290,6 +285,7 @@ public class IntegrationTestRpcClient {
static class SimpleClient extends Thread { static class SimpleClient extends Thread {
AbstractRpcClient rpcClient; AbstractRpcClient rpcClient;
AtomicBoolean running = new AtomicBoolean(true); AtomicBoolean running = new AtomicBoolean(true);
AtomicBoolean sending = new AtomicBoolean(false);
AtomicReference<Throwable> exception = new AtomicReference<>(null); AtomicReference<Throwable> exception = new AtomicReference<>(null);
Cluster cluster; Cluster cluster;
String id; String id;
@ -319,6 +315,7 @@ public class IntegrationTestRpcClient {
if (address == null) { if (address == null) {
throw new IOException("Listener channel is closed"); throw new IOException("Listener channel is closed");
} }
sending.set(true);
ret = (EchoResponseProto) ret = (EchoResponseProto)
rpcClient.callBlockingMethod(md, null, param, ret, user, address); rpcClient.callBlockingMethod(md, null, param, ret, user, address);
} catch (Exception e) { } catch (Exception e) {
@ -340,6 +337,9 @@ public class IntegrationTestRpcClient {
void stopRunning() { void stopRunning() {
running.set(false); running.set(false);
} }
boolean isSending() {
return sending.get();
}
void rethrowException() throws Throwable { void rethrowException() throws Throwable {
if (exception.get() != null) { if (exception.get() != null) {
@ -348,6 +348,29 @@ public class IntegrationTestRpcClient {
} }
} }
/*
Test that not started connections are successfully removed from connection pool when
rpc client is closing.
*/
@Test (timeout = 30000)
public void testRpcWithWriteThread() throws IOException, InterruptedException {
LOG.info("Starting test");
Cluster cluster = new Cluster(1, 1);
cluster.startServer();
conf.setBoolean(SPECIFIC_WRITE_THREAD, true);
for(int i = 0; i <1000; i++) {
AbstractRpcClient rpcClient = createRpcClient(conf, true);
SimpleClient client = new SimpleClient(cluster, rpcClient, "Client1");
client.start();
while(!client.isSending()) {
Thread.sleep(1);
}
client.stopRunning();
rpcClient.close();
}
}
@Test (timeout = 900000) @Test (timeout = 900000)
public void testRpcWithChaosMonkeyWithSyncClient() throws Throwable { public void testRpcWithChaosMonkeyWithSyncClient() throws Throwable {
for (int i = 0; i < numIterations; i++) { for (int i = 0; i < numIterations; i++) {