Improve RemoteConnectionManager consistency (#55759)

In order to iterate through remote connections, the remote connection
manager maintains a local cache of connected nodes. Unfortunately this
is difficult in relationship with testing as it is inherently racy in
comparison to the parent connection manager map of connections.

This commit improves the relationship by only returning a cached
connection if it is still registered with the parent. If the connection
is not open, we will go to the slow path of allocating a iterator
directly from the parent.
This commit is contained in:
Tim Brooks 2020-04-28 11:22:40 -06:00
parent 1e65ead01f
commit 54dbea6c65
No known key found for this signature in database
GPG Key ID: C2AA3BB91A889E77
5 changed files with 52 additions and 28 deletions

View File

@ -219,6 +219,7 @@ public class ClusterConnectionManager implements ConnectionManager {
return connectedNodes.size(); return connectedNodes.size();
} }
@Override
public Set<DiscoveryNode> getAllConnectedNodes() { public Set<DiscoveryNode> getAllConnectedNodes() {
return Collections.unmodifiableSet(connectedNodes.keySet()); return Collections.unmodifiableSet(connectedNodes.keySet());
} }

View File

@ -23,6 +23,7 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import java.io.Closeable; import java.io.Closeable;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
public interface ConnectionManager extends Closeable { public interface ConnectionManager extends Closeable {
@ -43,6 +44,8 @@ public interface ConnectionManager extends Closeable {
void disconnectFromNode(DiscoveryNode node); void disconnectFromNode(DiscoveryNode node);
Set<DiscoveryNode> getAllConnectedNodes();
int size(); int size();
@Override @Override

View File

@ -26,6 +26,7 @@ import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
public class RemoteConnectionManager implements ConnectionManager { public class RemoteConnectionManager implements ConnectionManager {
@ -33,7 +34,7 @@ public class RemoteConnectionManager implements ConnectionManager {
private final String clusterAlias; private final String clusterAlias;
private final ConnectionManager delegate; private final ConnectionManager delegate;
private final AtomicLong counter = new AtomicLong(); private final AtomicLong counter = new AtomicLong();
private volatile List<Transport.Connection> connections = Collections.emptyList(); private volatile List<DiscoveryNode> connectedNodes = Collections.emptyList();
RemoteConnectionManager(String clusterAlias, ConnectionManager delegate) { RemoteConnectionManager(String clusterAlias, ConnectionManager delegate) {
this.clusterAlias = clusterAlias; this.clusterAlias = clusterAlias;
@ -41,12 +42,12 @@ public class RemoteConnectionManager implements ConnectionManager {
this.delegate.addListener(new TransportConnectionListener() { this.delegate.addListener(new TransportConnectionListener() {
@Override @Override
public void onNodeConnected(DiscoveryNode node, Transport.Connection connection) { public void onNodeConnected(DiscoveryNode node, Transport.Connection connection) {
addConnection(connection); addConnectedNode(node);
} }
@Override @Override
public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connection) { public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connection) {
removeConnection(connection); removeConnectedNode(node);
} }
}); });
} }
@ -98,23 +99,36 @@ public class RemoteConnectionManager implements ConnectionManager {
} }
public Transport.Connection getAnyRemoteConnection() { public Transport.Connection getAnyRemoteConnection() {
List<Transport.Connection> localConnections = this.connections; List<DiscoveryNode> localConnectedNodes = this.connectedNodes;
if (localConnections.isEmpty()) { long curr;
throw new NoSuchRemoteClusterException(clusterAlias); while ((curr = counter.incrementAndGet()) == Long.MIN_VALUE);
} else { if (localConnectedNodes.isEmpty() == false) {
long curr; DiscoveryNode nextNode = localConnectedNodes.get(Math.toIntExact(Math.floorMod(curr, (long) localConnectedNodes.size())));
while ((curr = counter.incrementAndGet()) == Long.MIN_VALUE); try {
return localConnections.get(Math.toIntExact(Math.floorMod(curr, (long) localConnections.size()))); return delegate.getConnection(nextNode);
} catch (NodeNotConnectedException e) {
// Ignore. We will manually create an iterator of open nodes
}
} }
Set<DiscoveryNode> allConnectionNodes = getAllConnectedNodes();
for (DiscoveryNode connectedNode : allConnectionNodes) {
try {
return delegate.getConnection(connectedNode);
} catch (NodeNotConnectedException e) {
// Ignore. We will try the next one until all are exhausted.
}
}
throw new NoSuchRemoteClusterException(clusterAlias);
}
@Override
public Set<DiscoveryNode> getAllConnectedNodes() {
return delegate.getAllConnectedNodes();
} }
@Override @Override
public int size() { public int size() {
// Although we use a delegate instance, we report the connection manager size based on the return delegate.size();
// RemoteConnectionManager's knowledge of the connections. This is because there is a brief window
// in between the time when the connection is added to the delegate map, and the time when
// nodeConnected is called.
return this.connections.size();
} }
@Override @Override
@ -127,22 +141,22 @@ public class RemoteConnectionManager implements ConnectionManager {
delegate.closeNoBlock(); delegate.closeNoBlock();
} }
private synchronized void addConnection(Transport.Connection addedConnection) { private synchronized void addConnectedNode(DiscoveryNode addedNode) {
ArrayList<Transport.Connection> newConnections = new ArrayList<>(this.connections); ArrayList<DiscoveryNode> newConnections = new ArrayList<>(this.connectedNodes);
newConnections.add(addedConnection); newConnections.add(addedNode);
this.connections = Collections.unmodifiableList(newConnections); this.connectedNodes = Collections.unmodifiableList(newConnections);
} }
private synchronized void removeConnection(Transport.Connection removedConnection) { private synchronized void removeConnectedNode(DiscoveryNode removedNode) {
int newSize = this.connections.size() - 1; int newSize = this.connectedNodes.size() - 1;
ArrayList<Transport.Connection> newConnections = new ArrayList<>(newSize); ArrayList<DiscoveryNode> newConnectedNodes = new ArrayList<>(newSize);
for (Transport.Connection connection : this.connections) { for (DiscoveryNode connectedNode : this.connectedNodes) {
if (connection.equals(removedConnection) == false) { if (connectedNode.equals(removedNode) == false) {
newConnections.add(connection); newConnectedNodes.add(connectedNode);
} }
} }
assert newConnections.size() == newSize : "Expected connection count: " + newSize + ", Found: " + newConnections.size(); assert newConnectedNodes.size() == newSize : "Expected connection node count: " + newSize + ", Found: " + newConnectedNodes.size();
this.connections = Collections.unmodifiableList(newConnections); this.connectedNodes = Collections.unmodifiableList(newConnectedNodes);
} }
static final class ProxyConnection implements Transport.Connection { static final class ProxyConnection implements Transport.Connection {

View File

@ -211,7 +211,7 @@ public class SniffConnectionStrategy extends RemoteConnectionStrategy {
private final int maxNumRemoteConnections; private final int maxNumRemoteConnections;
private final Predicate<DiscoveryNode> nodePredicate; private final Predicate<DiscoveryNode> nodePredicate;
private final SetOnce<ClusterName> remoteClusterName = new SetOnce<>(); private final SetOnce<ClusterName> remoteClusterName = new SetOnce<>();
private volatile String proxyAddress; private final String proxyAddress;
SniffConnectionStrategy(String clusterAlias, TransportService transportService, RemoteConnectionManager connectionManager, SniffConnectionStrategy(String clusterAlias, TransportService transportService, RemoteConnectionManager connectionManager,
Settings settings) { Settings settings) {

View File

@ -27,6 +27,7 @@ import org.elasticsearch.transport.ConnectionManager;
import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportConnectionListener; import org.elasticsearch.transport.TransportConnectionListener;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
@ -111,6 +112,11 @@ public class StubbableConnectionManager implements ConnectionManager {
return delegate.size(); return delegate.size();
} }
@Override
public Set<DiscoveryNode> getAllConnectedNodes() {
return delegate.getAllConnectedNodes();
}
@Override @Override
public void close() { public void close() {
delegate.close(); delegate.close();