Remove locking around connection attempts (#46845)

Currently in the ConnectionManager we lock around the node id. This is
odd because we key connections by the ephemeral id. Upon further
investigation it appears to me that we do not need the locking. Using
the concurrent map, we can ensure that only one connection attempt
completes. There is a very small chance that a new connection attempt
will proceed right as another connection attempt is completing. However,
since the whole process is asynchronous and event oriented
(lightweight), that does not seem to be an issue.
This commit is contained in:
Tim Brooks 2019-09-24 10:55:49 -06:00
parent 00c1c0132b
commit 71ec0707cf
No known key found for this signature in database
GPG Key ID: C2AA3BB91A889E77
3 changed files with 59 additions and 64 deletions

View File

@ -43,12 +43,25 @@ public final class ListenableFuture<V> extends BaseFuture<V> implements ActionLi
private volatile boolean done = false;
private final List<Tuple<ActionListener<V>, ExecutorService>> listeners = new ArrayList<>();
/**
* Adds a listener to this future. If the future has not yet completed, the listener will be
* notified of a response or exception in a runnable submitted to the ExecutorService provided.
* If the future has completed, the listener will be notified immediately without forking to
* a different thread.
*/
public void addListener(ActionListener<V> listener, ExecutorService executor) {
addListener(listener, executor, null);
}
/**
* Adds a listener to this future. If the future has not yet completed, the listener will be
* notified of a response or exception in a runnable submitted to the ExecutorService provided.
* If the future has completed, the listener will be notified immediately without forking to
* a different thread.
*
* It will apply the provided ThreadContext (if not null) when executing the listening.
*/
public void addListener(ActionListener<V> listener, ExecutorService executor, ThreadContext threadContext) {
if (done) {
// run the callback directly, we don't hold the lock and don't need to fork!

View File

@ -20,22 +20,19 @@ package org.elasticsearch.transport;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AbstractRefCounted;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.KeyedLock;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.ListenableFuture;
import org.elasticsearch.common.util.concurrent.RunOnce;
import org.elasticsearch.core.internal.io.IOUtils;
import java.io.Closeable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
@ -53,8 +50,7 @@ public class ConnectionManager implements Closeable {
private static final Logger logger = LogManager.getLogger(ConnectionManager.class);
private final ConcurrentMap<DiscoveryNode, Transport.Connection> connectedNodes = ConcurrentCollections.newConcurrentMap();
private final KeyedLock<String> connectionLock = new KeyedLock<>(); // protects concurrent access to connectingNodes
private final Map<DiscoveryNode, List<ActionListener<Void>>> connectingNodes = ConcurrentCollections.newConcurrentMap();
private final ConcurrentMap<DiscoveryNode, ListenableFuture<Void>> pendingConnections = ConcurrentCollections.newConcurrentMap();
private final AbstractRefCounted connectingRefCounter = new AbstractRefCounted("connection manager") {
@Override
protected void closeInternal() {
@ -122,40 +118,37 @@ public class ConnectionManager implements Closeable {
return;
}
try (Releasable lock = connectionLock.acquire(node.getId())) {
Transport.Connection connection = connectedNodes.get(node);
if (connection != null) {
assert connectingNodes.containsKey(node) == false;
lock.close();
connectingRefCounter.decRef();
listener.onResponse(null);
return;
}
final List<ActionListener<Void>> connectionListeners = connectingNodes.computeIfAbsent(node, n -> new ArrayList<>());
connectionListeners.add(listener);
if (connectionListeners.size() > 1) {
// wait on previous entry to complete connection attempt
connectingRefCounter.decRef();
return;
}
if (connectedNodes.containsKey(node)) {
connectingRefCounter.decRef();
listener.onResponse(null);
return;
}
final RunOnce releaseOnce = new RunOnce(connectingRefCounter::decRef);
final ListenableFuture<Void> currentListener = new ListenableFuture<>();
final ListenableFuture<Void> existingListener = pendingConnections.putIfAbsent(node, currentListener);
if (existingListener != null) {
try {
// wait on previous entry to complete connection attempt
existingListener.addListener(listener, EsExecutors.newDirectExecutorService());
} finally {
connectingRefCounter.decRef();
}
return;
}
currentListener.addListener(listener, EsExecutors.newDirectExecutorService());
final RunOnce releaseOnce = new RunOnce(connectingRefCounter::decRef);
internalOpenConnection(node, resolvedProfile, ActionListener.wrap(conn -> {
connectionValidator.validate(conn, resolvedProfile, ActionListener.wrap(
ignored -> {
assert Transports.assertNotTransportThread("connection validator success");
boolean success = false;
List<ActionListener<Void>> listeners = null;
try {
// we acquire a connection lock, so no way there is an existing connection
try (Releasable ignored2 = connectionLock.acquire(node.getId())) {
connectedNodes.put(node, conn);
if (logger.isDebugEnabled()) {
logger.debug("connected to node [{}]", node);
}
if (connectedNodes.putIfAbsent(node, conn) != null) {
logger.debug("existing connection to node [{}], closing new redundant connection", node);
IOUtils.closeWhileHandlingException(conn);
} else {
logger.debug("connected to node [{}]", node);
try {
connectionListener.onNodeConnected(node);
} finally {
@ -166,45 +159,21 @@ public class ConnectionManager implements Closeable {
connectionListener.onNodeDisconnected(node);
}));
}
if (conn.isClosed()) {
throw new NodeNotConnectedException(node, "connection concurrently closed");
}
success = true;
listeners = connectingNodes.remove(node);
}
} catch (ConnectTransportException e) {
throw e;
} catch (Exception e) {
throw new ConnectTransportException(node, "general node connection failure", e);
} finally {
if (success == false) { // close the connection if there is a failure
logger.trace(() -> new ParameterizedMessage("failed to connect to [{}], cleaning dangling connections", node));
IOUtils.closeWhileHandlingException(conn);
} else {
releaseOnce.run();
ActionListener.onResponse(listeners, null);
}
ListenableFuture<Void> future = pendingConnections.remove(node);
assert future == currentListener : "Listener in pending map is different than the expected listener";
releaseOnce.run();
future.onResponse(null);
}
}, e -> {
assert Transports.assertNotTransportThread("connection validator failure");
IOUtils.closeWhileHandlingException(conn);
final List<ActionListener<Void>> listeners;
try (Releasable ignored = connectionLock.acquire(node.getId())) {
listeners = connectingNodes.remove(node);
}
releaseOnce.run();
ActionListener.onFailure(listeners, e);
failConnectionListeners(node, releaseOnce, e, currentListener);
}));
}, e -> {
assert Transports.assertNotTransportThread("internalOpenConnection failure");
final List<ActionListener<Void>> listeners;
try (Releasable ignored = connectionLock.acquire(node.getId())) {
listeners = connectingNodes.remove(node);
}
releaseOnce.run();
if (listeners != null) {
ActionListener.onFailure(listeners, e);
}
failConnectionListeners(node, releaseOnce, e, currentListener);
}));
}
@ -296,6 +265,15 @@ public class ConnectionManager implements Closeable {
}));
}
private void failConnectionListeners(DiscoveryNode node, RunOnce releaseOnce, Exception e, ListenableFuture<Void> expectedListener) {
ListenableFuture<Void> future = pendingConnections.remove(node);
releaseOnce.run();
if (future != null) {
assert future == expectedListener : "Listener in pending map is different than the expected listener";
future.onFailure(e);
}
}
ConnectionProfile getConnectionProfile() {
return defaultProfile;
}

View File

@ -151,10 +151,11 @@ public class ConnectionManagerTests extends ESTestCase {
}
};
CyclicBarrier barrier = new CyclicBarrier(11);
List<Thread> threads = new ArrayList<>();
AtomicInteger nodeConnectedCount = new AtomicInteger();
AtomicInteger nodeFailureCount = new AtomicInteger();
CyclicBarrier barrier = new CyclicBarrier(11);
for (int i = 0; i < 10; i++) {
Thread thread = new Thread(() -> {
try {
@ -166,6 +167,9 @@ public class ConnectionManagerTests extends ESTestCase {
connectionManager.connectToNode(node, connectionProfile, validator,
ActionListener.wrap(c -> {
nodeConnectedCount.incrementAndGet();
if (connectionManager.nodeConnected(node) == false) {
throw new AssertionError("Expected node to be connected");
}
assert latch.getCount() == 1;
latch.countDown();
}, e -> {