Prevent connection races in testEnsureWeReconnect (#56654)

Currently it is possible that a sniff connection round is occurring as
we enter another test loop in testEnsureWeReconnect. The problem is that
once we enter another loop, closing the connection manually can cause
this pre-existing connection round to fail. This round failing can fail
the test. This commit fixes the issue by ensuring that there are no
in-progress connections before entering another loop.
This commit is contained in:
Tim Brooks 2020-05-13 16:57:45 -06:00
parent cb4d5f5042
commit 195a5247d4
No known key found for this signature in database
GPG Key ID: C2AA3BB91A889E77
4 changed files with 5 additions and 11 deletions

View File

@ -19,8 +19,6 @@
package org.elasticsearch.transport; package org.elasticsearch.transport;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
@ -83,7 +81,6 @@ public class ProxyConnectionStrategy extends RemoteConnectionStrategy {
static final int CHANNELS_PER_CONNECTION = 1; static final int CHANNELS_PER_CONNECTION = 1;
private static final int MAX_CONNECT_ATTEMPTS_PER_RUN = 3; private static final int MAX_CONNECT_ATTEMPTS_PER_RUN = 3;
private static final Logger logger = LogManager.getLogger(ProxyConnectionStrategy.class);
private final int maxNumConnections; private final int maxNumConnections;
private final String configuredAddress; private final String configuredAddress;

View File

@ -111,7 +111,7 @@ public abstract class RemoteConnectionStrategy implements TransportConnectionLis
private final int maxPendingConnectionListeners; private final int maxPendingConnectionListeners;
private static final Logger logger = LogManager.getLogger(RemoteConnectionStrategy.class); protected final Logger logger = LogManager.getLogger(getClass());
private final AtomicBoolean closed = new AtomicBoolean(false); private final AtomicBoolean closed = new AtomicBoolean(false);
private final Object mutex = new Object(); private final Object mutex = new Object();
@ -313,8 +313,8 @@ public abstract class RemoteConnectionStrategy implements TransportConnectionLis
if (shouldOpenMoreConnections()) { if (shouldOpenMoreConnections()) {
// try to reconnect and fill up the slot of the disconnected node // try to reconnect and fill up the slot of the disconnected node
connect(ActionListener.wrap( connect(ActionListener.wrap(
ignore -> logger.trace("successfully connected after disconnect of {}", node), ignore -> logger.trace("[{}] successfully connected after disconnect of {}", clusterAlias, node),
e -> logger.trace(() -> new ParameterizedMessage("failed to connect after disconnect of {}", node), e))); e -> logger.debug(() -> new ParameterizedMessage("[{}] failed to connect after disconnect of {}", clusterAlias, node), e)));
} }
} }

View File

@ -19,8 +19,6 @@
package org.elasticsearch.transport; package org.elasticsearch.transport;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.util.SetOnce; import org.apache.lucene.util.SetOnce;
import org.elasticsearch.Version; import org.elasticsearch.Version;
@ -200,8 +198,6 @@ public class SniffConnectionStrategy extends RemoteConnectionStrategy {
static final int CHANNELS_PER_CONNECTION = 6; static final int CHANNELS_PER_CONNECTION = 6;
private static final Logger logger = LogManager.getLogger(SniffConnectionStrategy.class);
private static final Predicate<DiscoveryNode> DEFAULT_NODE_PREDICATE = (node) -> Version.CURRENT.isCompatible(node.getVersion()) private static final Predicate<DiscoveryNode> DEFAULT_NODE_PREDICATE = (node) -> Version.CURRENT.isCompatible(node.getVersion())
&& (node.isMasterNode() == false || node.isDataNode() || node.isIngestNode()); && (node.isMasterNode() == false || node.isDataNode() || node.isIngestNode());

View File

@ -98,6 +98,7 @@ public class RemoteClusterClientTests extends ESTestCase {
assertBusy(() -> assertTrue(remoteClusterService.isRemoteNodeConnected("test", remoteNode))); assertBusy(() -> assertTrue(remoteClusterService.isRemoteNodeConnected("test", remoteNode)));
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
RemoteClusterConnection remoteClusterConnection = remoteClusterService.getRemoteClusterConnection("test"); RemoteClusterConnection remoteClusterConnection = remoteClusterService.getRemoteClusterConnection("test");
assertBusy(remoteClusterConnection::assertNoRunningConnections);
ConnectionManager connectionManager = remoteClusterConnection.getConnectionManager(); ConnectionManager connectionManager = remoteClusterConnection.getConnectionManager();
Transport.Connection connection = connectionManager.getConnection(remoteNode); Transport.Connection connection = connectionManager.getConnection(remoteNode);
PlainActionFuture<Void> closeFuture = PlainActionFuture.newFuture(); PlainActionFuture<Void> closeFuture = PlainActionFuture.newFuture();
@ -109,7 +110,7 @@ public class RemoteClusterClientTests extends ESTestCase {
ClusterStateResponse clusterStateResponse = client.admin().cluster().prepareState().execute().get(); ClusterStateResponse clusterStateResponse = client.admin().cluster().prepareState().execute().get();
assertNotNull(clusterStateResponse); assertNotNull(clusterStateResponse);
assertEquals("foo_bar_cluster", clusterStateResponse.getState().getClusterName().value()); assertEquals("foo_bar_cluster", clusterStateResponse.getState().getClusterName().value());
assertTrue(remoteClusterService.isRemoteNodeConnected("test", remoteNode)); assertTrue(remoteClusterConnection.isNodeConnected(remoteNode));
} }
} }
} }