HBASE-14474 DeadLock in RpcClientImpl.Connection.close()
This commit is contained in:
parent
bdea8b891f
commit
32f49fa7fc
|
@ -914,14 +914,20 @@ public class RpcClientImpl extends AbstractRpcClient {
|
||||||
IPCUtil.write(this.out, header, call.param, cellBlock);
|
IPCUtil.write(this.out, header, call.param, cellBlock);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
// We set the value inside the synchronized block, this way the next in line
|
// We set the value inside the synchronized block, this way the next in line
|
||||||
// won't even try to write
|
// won't even try to write. Otherwise we might miss a call in the calls map?
|
||||||
markClosed(e);
|
shouldCloseConnection.set(true);
|
||||||
close();
|
|
||||||
writeException = e;
|
writeException = e;
|
||||||
interrupt();
|
interrupt();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// call close outside of the synchronized (outLock) to prevent deadlock - HBASE-14474
|
||||||
|
if (writeException != null) {
|
||||||
|
if (markClosed(writeException)) {
|
||||||
|
close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// We added a call, and may be started the connection close. In both cases, we
|
// We added a call, and may be started the connection close. In both cases, we
|
||||||
// need to notify the reader.
|
// need to notify the reader.
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
|
@ -1022,10 +1028,11 @@ public class RpcClientImpl extends AbstractRpcClient {
|
||||||
e.getStackTrace(), doNotRetry);
|
e.getStackTrace(), doNotRetry);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected synchronized void markClosed(IOException e) {
|
protected synchronized boolean markClosed(IOException e) {
|
||||||
if (e == null) throw new NullPointerException();
|
if (e == null) throw new NullPointerException();
|
||||||
|
|
||||||
if (shouldCloseConnection.compareAndSet(false, true)) {
|
boolean ret = shouldCloseConnection.compareAndSet(false, true);
|
||||||
|
if (ret) {
|
||||||
if (LOG.isTraceEnabled()) {
|
if (LOG.isTraceEnabled()) {
|
||||||
LOG.trace(getName() + ": marking at should close, reason: " + e.getMessage());
|
LOG.trace(getName() + ": marking at should close, reason: " + e.getMessage());
|
||||||
}
|
}
|
||||||
|
@ -1034,6 +1041,7 @@ public class RpcClientImpl extends AbstractRpcClient {
|
||||||
}
|
}
|
||||||
notifyAll();
|
notifyAll();
|
||||||
}
|
}
|
||||||
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -1142,14 +1150,15 @@ public class RpcClientImpl extends AbstractRpcClient {
|
||||||
}
|
}
|
||||||
if (connsToClose != null) {
|
if (connsToClose != null) {
|
||||||
for (Connection conn : connsToClose) {
|
for (Connection conn : connsToClose) {
|
||||||
conn.markClosed(new InterruptedIOException("RpcClient is closing"));
|
if (conn.markClosed(new InterruptedIOException("RpcClient is closing"))) {
|
||||||
conn.close();
|
conn.close();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// wait until all connections are closed
|
// wait until all connections are closed
|
||||||
while (!connections.isEmpty()) {
|
while (!connections.isEmpty()) {
|
||||||
try {
|
try {
|
||||||
Thread.sleep(100);
|
Thread.sleep(10);
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
LOG.info("Interrupted while stopping the client. We still have " + connections.size() +
|
LOG.info("Interrupted while stopping the client. We still have " + connections.size() +
|
||||||
" connections.");
|
" connections.");
|
||||||
|
|
Loading…
Reference in New Issue