From 195a5247d4becdbf6e23c0c7af4475a9ec17ac09 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Wed, 13 May 2020 16:57:45 -0600 Subject: [PATCH] Prevent connection races in testEnsureWeReconnect (#56654) Currently it is possible that a sniff connection round is occurring as we enter another test loop in testEnsureWeReconnect. The problem is that once we enter another loop, closing the connection manually can cause this pre-existing connection round to fail. This round failing can fail the test. This commit fixes the issue by ensuring that there are no in-progress connections before entering another loop. --- .../elasticsearch/transport/ProxyConnectionStrategy.java | 3 --- .../elasticsearch/transport/RemoteConnectionStrategy.java | 6 +++--- .../elasticsearch/transport/SniffConnectionStrategy.java | 4 ---- .../elasticsearch/transport/RemoteClusterClientTests.java | 3 ++- 4 files changed, 5 insertions(+), 11 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/transport/ProxyConnectionStrategy.java b/server/src/main/java/org/elasticsearch/transport/ProxyConnectionStrategy.java index 024f49300a5..bae8ea072ee 100644 --- a/server/src/main/java/org/elasticsearch/transport/ProxyConnectionStrategy.java +++ b/server/src/main/java/org/elasticsearch/transport/ProxyConnectionStrategy.java @@ -19,8 +19,6 @@ package org.elasticsearch.transport; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; @@ -83,7 +81,6 @@ public class ProxyConnectionStrategy extends RemoteConnectionStrategy { static final int CHANNELS_PER_CONNECTION = 1; private static final int MAX_CONNECT_ATTEMPTS_PER_RUN = 3; - private static final Logger logger = LogManager.getLogger(ProxyConnectionStrategy.class); private final int maxNumConnections; private final String configuredAddress; diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java b/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java index fa4a5206f83..9232748e181 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java @@ -111,7 +111,7 @@ public abstract class RemoteConnectionStrategy implements TransportConnectionLis private final int maxPendingConnectionListeners; - private static final Logger logger = LogManager.getLogger(RemoteConnectionStrategy.class); + protected final Logger logger = LogManager.getLogger(getClass()); private final AtomicBoolean closed = new AtomicBoolean(false); private final Object mutex = new Object(); @@ -313,8 +313,8 @@ public abstract class RemoteConnectionStrategy implements TransportConnectionLis if (shouldOpenMoreConnections()) { // try to reconnect and fill up the slot of the disconnected node connect(ActionListener.wrap( - ignore -> logger.trace("successfully connected after disconnect of {}", node), - e -> logger.trace(() -> new ParameterizedMessage("failed to connect after disconnect of {}", node), e))); + ignore -> logger.trace("[{}] successfully connected after disconnect of {}", clusterAlias, node), + e -> logger.debug(() -> new ParameterizedMessage("[{}] failed to connect after disconnect of {}", clusterAlias, node), e))); } } diff --git a/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java b/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java index 79eef4eea6e..202cbff9ad9 100644 --- a/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java +++ b/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java @@ -19,8 +19,6 @@ package org.elasticsearch.transport; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.util.SetOnce; import org.elasticsearch.Version; @@ -200,8 +198,6 @@ public class SniffConnectionStrategy extends RemoteConnectionStrategy { static final int CHANNELS_PER_CONNECTION = 6; - private static final Logger logger = LogManager.getLogger(SniffConnectionStrategy.class); - private static final Predicate DEFAULT_NODE_PREDICATE = (node) -> Version.CURRENT.isCompatible(node.getVersion()) && (node.isMasterNode() == false || node.isDataNode() || node.isIngestNode()); diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterClientTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterClientTests.java index 32a1db5dfeb..596a3ab70b3 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterClientTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterClientTests.java @@ -98,6 +98,7 @@ public class RemoteClusterClientTests extends ESTestCase { assertBusy(() -> assertTrue(remoteClusterService.isRemoteNodeConnected("test", remoteNode))); for (int i = 0; i < 10; i++) { RemoteClusterConnection remoteClusterConnection = remoteClusterService.getRemoteClusterConnection("test"); + assertBusy(remoteClusterConnection::assertNoRunningConnections); ConnectionManager connectionManager = remoteClusterConnection.getConnectionManager(); Transport.Connection connection = connectionManager.getConnection(remoteNode); PlainActionFuture closeFuture = PlainActionFuture.newFuture(); @@ -109,7 +110,7 @@ public class RemoteClusterClientTests extends ESTestCase { ClusterStateResponse clusterStateResponse = client.admin().cluster().prepareState().execute().get(); assertNotNull(clusterStateResponse); assertEquals("foo_bar_cluster", clusterStateResponse.getState().getClusterName().value()); - assertTrue(remoteClusterService.isRemoteNodeConnected("test", remoteNode)); + assertTrue(remoteClusterConnection.isNodeConnected(remoteNode)); } } }