diff --git a/server/src/main/java/org/elasticsearch/transport/ClusterConnectionManager.java b/server/src/main/java/org/elasticsearch/transport/ClusterConnectionManager.java index e8dba1c28f1..76e63c12ab2 100644 --- a/server/src/main/java/org/elasticsearch/transport/ClusterConnectionManager.java +++ b/server/src/main/java/org/elasticsearch/transport/ClusterConnectionManager.java @@ -219,6 +219,7 @@ public class ClusterConnectionManager implements ConnectionManager { return connectedNodes.size(); } + @Override public Set getAllConnectedNodes() { return Collections.unmodifiableSet(connectedNodes.keySet()); } diff --git a/server/src/main/java/org/elasticsearch/transport/ConnectionManager.java b/server/src/main/java/org/elasticsearch/transport/ConnectionManager.java index 1f8a73d575d..7e47e04ad8d 100644 --- a/server/src/main/java/org/elasticsearch/transport/ConnectionManager.java +++ b/server/src/main/java/org/elasticsearch/transport/ConnectionManager.java @@ -23,6 +23,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.node.DiscoveryNode; import java.io.Closeable; +import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; public interface ConnectionManager extends Closeable { @@ -43,6 +44,8 @@ public interface ConnectionManager extends Closeable { void disconnectFromNode(DiscoveryNode node); + Set getAllConnectedNodes(); + int size(); @Override diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteConnectionManager.java b/server/src/main/java/org/elasticsearch/transport/RemoteConnectionManager.java index 8de880e9917..4e0c7289fad 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteConnectionManager.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteConnectionManager.java @@ -26,6 +26,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Set; import java.util.concurrent.atomic.AtomicLong; public class RemoteConnectionManager implements ConnectionManager { @@ -33,7 +34,7 @@ public class RemoteConnectionManager implements ConnectionManager { private final String clusterAlias; private final ConnectionManager delegate; private final AtomicLong counter = new AtomicLong(); - private volatile List connections = Collections.emptyList(); + private volatile List connectedNodes = Collections.emptyList(); RemoteConnectionManager(String clusterAlias, ConnectionManager delegate) { this.clusterAlias = clusterAlias; @@ -41,12 +42,12 @@ public class RemoteConnectionManager implements ConnectionManager { this.delegate.addListener(new TransportConnectionListener() { @Override public void onNodeConnected(DiscoveryNode node, Transport.Connection connection) { - addConnection(connection); + addConnectedNode(node); } @Override public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connection) { - removeConnection(connection); + removeConnectedNode(node); } }); } @@ -98,23 +99,36 @@ public class RemoteConnectionManager implements ConnectionManager { } public Transport.Connection getAnyRemoteConnection() { - List localConnections = this.connections; - if (localConnections.isEmpty()) { - throw new NoSuchRemoteClusterException(clusterAlias); - } else { - long curr; - while ((curr = counter.incrementAndGet()) == Long.MIN_VALUE); - return localConnections.get(Math.toIntExact(Math.floorMod(curr, (long) localConnections.size()))); + List localConnectedNodes = this.connectedNodes; + long curr; + while ((curr = counter.incrementAndGet()) == Long.MIN_VALUE); + if (localConnectedNodes.isEmpty() == false) { + DiscoveryNode nextNode = localConnectedNodes.get(Math.toIntExact(Math.floorMod(curr, (long) localConnectedNodes.size()))); + try { + return delegate.getConnection(nextNode); + } catch (NodeNotConnectedException e) { + // Ignore. We will manually create an iterator of open nodes + } } + Set allConnectionNodes = getAllConnectedNodes(); + for (DiscoveryNode connectedNode : allConnectionNodes) { + try { + return delegate.getConnection(connectedNode); + } catch (NodeNotConnectedException e) { + // Ignore. We will try the next one until all are exhausted. + } + } + throw new NoSuchRemoteClusterException(clusterAlias); + } + + @Override + public Set getAllConnectedNodes() { + return delegate.getAllConnectedNodes(); } @Override public int size() { - // Although we use a delegate instance, we report the connection manager size based on the - // RemoteConnectionManager's knowledge of the connections. This is because there is a brief window - // in between the time when the connection is added to the delegate map, and the time when - // nodeConnected is called. - return this.connections.size(); + return delegate.size(); } @Override @@ -127,22 +141,22 @@ public class RemoteConnectionManager implements ConnectionManager { delegate.closeNoBlock(); } - private synchronized void addConnection(Transport.Connection addedConnection) { - ArrayList newConnections = new ArrayList<>(this.connections); - newConnections.add(addedConnection); - this.connections = Collections.unmodifiableList(newConnections); + private synchronized void addConnectedNode(DiscoveryNode addedNode) { + ArrayList newConnections = new ArrayList<>(this.connectedNodes); + newConnections.add(addedNode); + this.connectedNodes = Collections.unmodifiableList(newConnections); } - private synchronized void removeConnection(Transport.Connection removedConnection) { - int newSize = this.connections.size() - 1; - ArrayList newConnections = new ArrayList<>(newSize); - for (Transport.Connection connection : this.connections) { - if (connection.equals(removedConnection) == false) { - newConnections.add(connection); + private synchronized void removeConnectedNode(DiscoveryNode removedNode) { + int newSize = this.connectedNodes.size() - 1; + ArrayList newConnectedNodes = new ArrayList<>(newSize); + for (DiscoveryNode connectedNode : this.connectedNodes) { + if (connectedNode.equals(removedNode) == false) { + newConnectedNodes.add(connectedNode); } } - assert newConnections.size() == newSize : "Expected connection count: " + newSize + ", Found: " + newConnections.size(); - this.connections = Collections.unmodifiableList(newConnections); + assert newConnectedNodes.size() == newSize : "Expected connection node count: " + newSize + ", Found: " + newConnectedNodes.size(); + this.connectedNodes = Collections.unmodifiableList(newConnectedNodes); } static final class ProxyConnection implements Transport.Connection { diff --git a/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java b/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java index 7798233974e..4e019b79921 100644 --- a/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java +++ b/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java @@ -211,7 +211,7 @@ public class SniffConnectionStrategy extends RemoteConnectionStrategy { private final int maxNumRemoteConnections; private final Predicate nodePredicate; private final SetOnce remoteClusterName = new SetOnce<>(); - private volatile String proxyAddress; + private final String proxyAddress; SniffConnectionStrategy(String clusterAlias, TransportService transportService, RemoteConnectionManager connectionManager, Settings settings) { diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/StubbableConnectionManager.java b/test/framework/src/main/java/org/elasticsearch/test/transport/StubbableConnectionManager.java index 9bdc5df6542..a3ea412cc5e 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/StubbableConnectionManager.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/StubbableConnectionManager.java @@ -27,6 +27,7 @@ import org.elasticsearch.transport.ConnectionManager; import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportConnectionListener; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -111,6 +112,11 @@ public class StubbableConnectionManager implements ConnectionManager { return delegate.size(); } + @Override + public Set getAllConnectedNodes() { + return delegate.getAllConnectedNodes(); + } + @Override public void close() { delegate.close();