HADOOP-16127. In ipc.Client, put a new connection could happen after stop.

This commit is contained in:
Tsz Wo Nicholas Sze 2019-02-26 15:14:21 -08:00
parent a5a751b418
commit 9192f71e21
2 changed files with 72 additions and 55 deletions

View File

@ -71,6 +71,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import static org.apache.hadoop.ipc.RpcConstants.CONNECTION_CONTEXT_CALL_ID; import static org.apache.hadoop.ipc.RpcConstants.CONNECTION_CONTEXT_CALL_ID;
import static org.apache.hadoop.ipc.RpcConstants.PING_CALL_ID; import static org.apache.hadoop.ipc.RpcConstants.PING_CALL_ID;
@ -84,9 +85,7 @@ import static org.apache.hadoop.ipc.RpcConstants.PING_CALL_ID;
@Public @Public
@InterfaceStability.Evolving @InterfaceStability.Evolving
public class Client implements AutoCloseable { public class Client implements AutoCloseable {
public static final Logger LOG = LoggerFactory.getLogger(Client.class); public static final Logger LOG = LoggerFactory.getLogger(Client.class);
private static final int STOP_SLEEP_TIME_MS = 10;
/** A counter for generating call IDs. */ /** A counter for generating call IDs. */
private static final AtomicInteger callIdCounter = new AtomicInteger(); private static final AtomicInteger callIdCounter = new AtomicInteger();
@ -124,15 +123,17 @@ public class Client implements AutoCloseable {
EXTERNAL_CALL_HANDLER.set(externalHandler); EXTERNAL_CALL_HANDLER.set(externalHandler);
} }
private ConcurrentMap<ConnectionId, Connection> connections = private final ConcurrentMap<ConnectionId, Connection> connections =
new ConcurrentHashMap<>(); new ConcurrentHashMap<>();
private final Object putLock = new Object();
private final Object emptyCondition = new Object();
private final AtomicBoolean running = new AtomicBoolean(true);
private Class<? extends Writable> valueClass; // class of call values private Class<? extends Writable> valueClass; // class of call values
private AtomicBoolean running = new AtomicBoolean(true); // if client runs
final private Configuration conf; final private Configuration conf;
private SocketFactory socketFactory; // how to create sockets private SocketFactory socketFactory; // how to create sockets
private int refCount = 1; private final AtomicInteger refCount = new AtomicInteger(1);
private final int connectionTimeout; private final int connectionTimeout;
@ -207,7 +208,7 @@ public class Client implements AutoCloseable {
return clientExecutor; return clientExecutor;
} }
}; }
/** /**
* set the ping interval value in configuration * set the ping interval value in configuration
@ -281,29 +282,19 @@ public class Client implements AutoCloseable {
public static final ExecutorService getClientExecutor() { public static final ExecutorService getClientExecutor() {
return Client.clientExcecutorFactory.clientExecutor; return Client.clientExcecutorFactory.clientExecutor;
} }
/** /**
* Increment this client's reference count * Increment this client's reference count
*
*/ */
synchronized void incCount() { void incCount() {
refCount++; refCount.incrementAndGet();
} }
/** /**
* Decrement this client's reference count * Decrement this client's reference count
*
*/ */
synchronized void decCount() { int decAndGetCount() {
refCount--; return refCount.decrementAndGet();
}
/**
* Return if this client has no reference
*
* @return true if this client has no reference; false otherwise
*/
synchronized boolean isZeroReference() {
return refCount==0;
} }
/** Check the rpc response header. */ /** Check the rpc response header. */
@ -452,17 +443,13 @@ public class Client implements AutoCloseable {
private final Object sendRpcRequestLock = new Object(); private final Object sendRpcRequestLock = new Object();
private AtomicReference<Thread> connectingThread = new AtomicReference<>(); private AtomicReference<Thread> connectingThread = new AtomicReference<>();
private final Consumer<Connection> removeMethod;
public Connection(ConnectionId remoteId, int serviceClass) throws IOException { Connection(ConnectionId remoteId, int serviceClass,
Consumer<Connection> removeMethod) {
this.remoteId = remoteId; this.remoteId = remoteId;
this.server = remoteId.getAddress(); this.server = remoteId.getAddress();
if (server.isUnresolved()) {
throw NetUtils.wrapException(server.getHostName(),
server.getPort(),
null,
0,
new UnknownHostException());
}
this.maxResponseLength = remoteId.conf.getInt( this.maxResponseLength = remoteId.conf.getInt(
CommonConfigurationKeys.IPC_MAXIMUM_RESPONSE_LENGTH, CommonConfigurationKeys.IPC_MAXIMUM_RESPONSE_LENGTH,
CommonConfigurationKeys.IPC_MAXIMUM_RESPONSE_LENGTH_DEFAULT); CommonConfigurationKeys.IPC_MAXIMUM_RESPONSE_LENGTH_DEFAULT);
@ -481,7 +468,12 @@ public class Client implements AutoCloseable {
.makeRpcRequestHeader(RpcKind.RPC_PROTOCOL_BUFFER, .makeRpcRequestHeader(RpcKind.RPC_PROTOCOL_BUFFER,
OperationProto.RPC_FINAL_PACKET, PING_CALL_ID, OperationProto.RPC_FINAL_PACKET, PING_CALL_ID,
RpcConstants.INVALID_RETRY_COUNT, clientId); RpcConstants.INVALID_RETRY_COUNT, clientId);
pingHeader.writeDelimitedTo(buf); try {
pingHeader.writeDelimitedTo(buf);
} catch (IOException e) {
throw new IllegalStateException("Failed to write to buf for "
+ remoteId + " in " + Client.this + " due to " + e, e);
}
pingRequest = buf.toByteArray(); pingRequest = buf.toByteArray();
} }
this.pingInterval = remoteId.getPingInterval(); this.pingInterval = remoteId.getPingInterval();
@ -494,6 +486,8 @@ public class Client implements AutoCloseable {
this.soTimeout = pingInterval; this.soTimeout = pingInterval;
} }
this.serviceClass = serviceClass; this.serviceClass = serviceClass;
this.removeMethod = removeMethod;
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("The ping interval is " + this.pingInterval + " ms."); LOG.debug("The ping interval is " + this.pingInterval + " ms.");
} }
@ -1253,7 +1247,7 @@ public class Client implements AutoCloseable {
// We have marked this connection as closed. Other thread could have // We have marked this connection as closed. Other thread could have
// already known it and replace this closedConnection with a new one. // already known it and replace this closedConnection with a new one.
// We should only remove this closedConnection. // We should only remove this closedConnection.
connections.remove(remoteId, this); removeMethod.accept(this);
// close the streams and therefore the socket // close the streams and therefore the socket
IOUtils.closeStream(ipcStreams); IOUtils.closeStream(ipcStreams);
@ -1325,7 +1319,13 @@ public class Client implements AutoCloseable {
public Client(Class<? extends Writable> valueClass, Configuration conf) { public Client(Class<? extends Writable> valueClass, Configuration conf) {
this(valueClass, conf, NetUtils.getDefaultSocketFactory(conf)); this(valueClass, conf, NetUtils.getDefaultSocketFactory(conf));
} }
@Override
public String toString() {
return getClass().getSimpleName() + "-"
+ StringUtils.byteToHexString(clientId);
}
/** Return the socket factory of this client /** Return the socket factory of this client
* *
* @return this client's socket factory * @return this client's socket factory
@ -1340,11 +1340,12 @@ public class Client implements AutoCloseable {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Stopping client"); LOG.debug("Stopping client");
} }
synchronized (putLock) { // synchronized to avoid put after stop
if (!running.compareAndSet(true, false)) { if (!running.compareAndSet(true, false)) {
return; return;
}
} }
// wake up all connections // wake up all connections
for (Connection conn : connections.values()) { for (Connection conn : connections.values()) {
conn.interrupt(); conn.interrupt();
@ -1352,13 +1353,15 @@ public class Client implements AutoCloseable {
} }
// wait until all connections are closed // wait until all connections are closed
while (!connections.isEmpty()) { synchronized (emptyCondition) {
try { // synchronized the loop to guarantee wait must be notified.
Thread.sleep(STOP_SLEEP_TIME_MS); while (!connections.isEmpty()) {
} catch (InterruptedException e) { try {
emptyCondition.wait();
} catch (InterruptedException e) {
}
} }
} }
clientExcecutorFactory.unrefAndCleanup(); clientExcecutorFactory.unrefAndCleanup();
} }
@ -1569,24 +1572,37 @@ public class Client implements AutoCloseable {
private Connection getConnection(ConnectionId remoteId, private Connection getConnection(ConnectionId remoteId,
Call call, int serviceClass, AtomicBoolean fallbackToSimpleAuth) Call call, int serviceClass, AtomicBoolean fallbackToSimpleAuth)
throws IOException { throws IOException {
if (!running.get()) { final InetSocketAddress address = remoteId.getAddress();
// the client is stopped if (address.isUnresolved()) {
throw new IOException("The client is stopped"); throw NetUtils.wrapException(address.getHostName(),
address.getPort(),
null,
0,
new UnknownHostException());
} }
final Consumer<Connection> removeMethod = c -> {
final boolean removed = connections.remove(remoteId, c);
if (removed && connections.isEmpty()) {
synchronized (emptyCondition) {
emptyCondition.notify();
}
}
};
Connection connection; Connection connection;
/* we could avoid this allocation for each RPC by having a /* we could avoid this allocation for each RPC by having a
* connectionsId object and with set() method. We need to manage the * connectionsId object and with set() method. We need to manage the
* refs for keys in HashMap properly. For now its ok. * refs for keys in HashMap properly. For now its ok.
*/ */
while (true) { while (true) {
// These lines below can be shorten with computeIfAbsent in Java8 synchronized (putLock) { // synchronized to avoid put after stop
connection = connections.get(remoteId); if (!running.get()) {
if (connection == null) { throw new IOException("Failed to get connection for " + remoteId
connection = new Connection(remoteId, serviceClass); + ", " + call + ": " + this + " is already stopped");
Connection existing = connections.putIfAbsent(remoteId, connection);
if (existing != null) {
connection = existing;
} }
connection = connections.computeIfAbsent(remoteId,
id -> new Connection(id, serviceClass, removeMethod));
} }
if (connection.addCall(call)) { if (connection.addCall(call)) {
@ -1596,7 +1612,7 @@ public class Client implements AutoCloseable {
// have already known this closedConnection, and replace it with a new // have already known this closedConnection, and replace it with a new
// connection. So we should call conditional remove to make sure we only // connection. So we should call conditional remove to make sure we only
// remove this closedConnection. // remove this closedConnection.
connections.remove(remoteId, connection); removeMethod.accept(connection);
} }
} }

View File

@ -96,16 +96,17 @@ public class ClientCache {
if (Client.LOG.isDebugEnabled()) { if (Client.LOG.isDebugEnabled()) {
Client.LOG.debug("stopping client from cache: " + client); Client.LOG.debug("stopping client from cache: " + client);
} }
final int count;
synchronized (this) { synchronized (this) {
client.decCount(); count = client.decAndGetCount();
if (client.isZeroReference()) { if (count == 0) {
if (Client.LOG.isDebugEnabled()) { if (Client.LOG.isDebugEnabled()) {
Client.LOG.debug("removing client from cache: " + client); Client.LOG.debug("removing client from cache: " + client);
} }
clients.remove(client.getSocketFactory()); clients.remove(client.getSocketFactory());
} }
} }
if (client.isZeroReference()) { if (count == 0) {
if (Client.LOG.isDebugEnabled()) { if (Client.LOG.isDebugEnabled()) {
Client.LOG.debug("stopping actual client because no more references remain: " Client.LOG.debug("stopping actual client because no more references remain: "
+ client); + client);