From b84d1e2577b131cdb2531017a7b926655f549d0d Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Thu, 7 May 2020 13:11:56 -0600 Subject: [PATCH] Improve logging around SniffConnectionStrategy (#56378) Currently, the logging around the SniffConnectionStrategy is limited. The log messages are inconsistent and sometimes wrong. This commit cleans up these log message to describe when connections are happening and what failed if a step fails. Additionally, this commit enables TRACE logging for a problematic test (testEnsureWeReconnect). --- .../transport/SniffConnectionStrategy.java | 38 +++++++++------ .../transport/RemoteClusterClientTests.java | 47 ++++++++----------- 2 files changed, 44 insertions(+), 41 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java b/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java index 4e019b79921..79eef4eea6e 100644 --- a/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java +++ b/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java @@ -296,18 +296,17 @@ public class SniffConnectionStrategy extends RemoteConnectionStrategy { // ISE if we fail the handshake with an version incompatible node if (seedNodes.hasNext()) { logger.debug(() -> new ParameterizedMessage( - "fetching nodes from external cluster [{}] failed moving to next node", clusterAlias), e); + "fetching nodes from external cluster [{}] failed moving to next seed node", clusterAlias), e); collectRemoteNodes(seedNodes, listener); return; } } - logger.warn(() -> new ParameterizedMessage("fetching nodes from external cluster [{}] failed", clusterAlias), e); + logger.warn(new ParameterizedMessage("fetching nodes from external cluster [{}] failed", clusterAlias), e); listener.onFailure(e); }; final DiscoveryNode seedNode = seedNodes.next().get(); - logger.debug("[{}] opening connection to seed node: [{}] proxy address: [{}]", clusterAlias, seedNode, - proxyAddress); + logger.trace("[{}] opening transient connection to seed node: [{}]", clusterAlias, seedNode); final StepListener openConnectionStep = new StepListener<>(); try { connectionManager.openConnection(seedNode, null, openConnectionStep); @@ -324,17 +323,21 @@ public class SniffConnectionStrategy extends RemoteConnectionStrategy { final StepListener fullConnectionStep = new StepListener<>(); handshakeStep.whenComplete(handshakeResponse -> { - final DiscoveryNode handshakeNode = maybeAddProxyAddress(proxyAddress, handshakeResponse.getDiscoveryNode()); + final DiscoveryNode handshakeNode = handshakeResponse.getDiscoveryNode(); if (nodePredicate.test(handshakeNode) && shouldOpenMoreConnections()) { - connectionManager.connectToNode(handshakeNode, null, - transportService.connectionValidator(handshakeNode), fullConnectionStep); + logger.trace("[{}] opening managed connection to seed node: [{}] proxy address: [{}]", clusterAlias, handshakeNode, + proxyAddress); + final DiscoveryNode handshakeNodeWithProxy = maybeAddProxyAddress(proxyAddress, handshakeNode); + connectionManager.connectToNode(handshakeNodeWithProxy, null, + transportService.connectionValidator(handshakeNodeWithProxy), fullConnectionStep); } else { fullConnectionStep.onResponse(null); } }, e -> { final Transport.Connection connection = openConnectionStep.result(); - logger.warn(new ParameterizedMessage("failed to connect to seed node [{}]", connection.getNode()), e); + final DiscoveryNode node = connection.getNode(); + logger.debug(() -> new ParameterizedMessage("[{}] failed to handshake with seed node: [{}]", clusterAlias, node), e); IOUtils.closeWhileHandlingException(connection); onFailure.accept(e); }); @@ -366,6 +369,10 @@ public class SniffConnectionStrategy extends RemoteConnectionStrategy { responseHandler); } }, e -> { + final Transport.Connection connection = openConnectionStep.result(); + final DiscoveryNode node = connection.getNode(); + logger.debug(() -> new ParameterizedMessage( + "[{}] failed to open managed connection to seed node: [{}]", clusterAlias, node), e); IOUtils.closeWhileHandlingException(openConnectionStep.result()); onFailure.accept(e); }); @@ -400,9 +407,11 @@ public class SniffConnectionStrategy extends RemoteConnectionStrategy { private void handleNodes(Iterator nodesIter) { while (nodesIter.hasNext()) { - final DiscoveryNode node = maybeAddProxyAddress(proxyAddress, nodesIter.next()); + final DiscoveryNode node = nodesIter.next(); if (nodePredicate.test(node) && shouldOpenMoreConnections()) { - connectionManager.connectToNode(node, null, + logger.trace("[{}] opening managed connection to node: [{}] proxy address: [{}]", clusterAlias, node, proxyAddress); + final DiscoveryNode nodeWithProxy = maybeAddProxyAddress(proxyAddress, node); + connectionManager.connectToNode(nodeWithProxy, null, transportService.connectionValidator(node), new ActionListener() { @Override public void onResponse(Void aVoid) { @@ -414,11 +423,12 @@ public class SniffConnectionStrategy extends RemoteConnectionStrategy { if (e instanceof ConnectTransportException || e instanceof IllegalStateException) { // ISE if we fail the handshake with an version incompatible node // fair enough we can't connect just move on - logger.debug(() -> new ParameterizedMessage("failed to connect to node {}", node), e); + logger.debug(() -> new ParameterizedMessage( + "[{}] failed to open managed connection to node [{}]", clusterAlias, node), e); handleNodes(nodesIter); } else { - logger.warn(() -> - new ParameterizedMessage("fetching nodes from external cluster {} failed", clusterAlias), e); + logger.warn(new ParameterizedMessage( + "[{}] failed to open managed connection to node [{}]", clusterAlias, node), e); IOUtils.closeWhileHandlingException(connection); collectRemoteNodes(seedNodes, listener); } @@ -441,7 +451,7 @@ public class SniffConnectionStrategy extends RemoteConnectionStrategy { @Override public void handleException(TransportException exp) { - logger.warn(() -> new ParameterizedMessage("fetching nodes from external cluster {} failed", clusterAlias), exp); + logger.warn(new ParameterizedMessage("fetching nodes from external cluster {} failed", clusterAlias), exp); try { IOUtils.closeWhileHandlingException(connection); } finally { diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterClientTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterClientTests.java index 9bea51ddfef..32a1db5dfeb 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterClientTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterClientTests.java @@ -20,18 +20,19 @@ package org.elasticsearch.transport; import org.elasticsearch.Version; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.node.Node; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; import java.util.Collections; -import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import static org.elasticsearch.transport.RemoteClusterConnectionTests.startTransport; @@ -76,6 +77,9 @@ public class RemoteClusterClientTests extends ESTestCase { } } + @TestLogging( + value = "org.elasticsearch.transport.SniffConnectionStrategy:TRACE,org.elasticsearch.transport.ClusterConnectionManager:TRACE", + reason = "debug intermittent test failure") public void testEnsureWeReconnect() throws Exception { Settings remoteSettings = Settings.builder().put(ClusterName.CLUSTER_NAME_SETTING.getKey(), "foo_bar_cluster").build(); try (MockTransportService remoteTransport = startTransport("remote_node", Collections.emptyList(), Version.CURRENT, threadPool, @@ -86,37 +90,26 @@ public class RemoteClusterClientTests extends ESTestCase { .put("cluster.remote.test.seeds", remoteNode.getAddress().getAddress() + ":" + remoteNode.getAddress().getPort()).build(); try (MockTransportService service = MockTransportService.createNewService(localSettings, Version.CURRENT, threadPool, null)) { - Semaphore semaphore = new Semaphore(1); service.start(); - service.getRemoteClusterService().getConnections().forEach(con -> { - con.getConnectionManager().addListener(new TransportConnectionListener() { - @Override - public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connection) { - if (remoteNode.equals(node)) { - semaphore.release(); - } - } - }); - }); // this test is not perfect since we might reconnect concurrently but it will fail most of the time if we don't have // the right calls in place in the RemoteAwareClient service.acceptIncomingRequests(); + RemoteClusterService remoteClusterService = service.getRemoteClusterService(); + assertBusy(() -> assertTrue(remoteClusterService.isRemoteNodeConnected("test", remoteNode))); for (int i = 0; i < 10; i++) { - semaphore.acquire(); - try { - service.getRemoteClusterService().getConnections().forEach(con -> { - con.getConnectionManager().disconnectFromNode(remoteNode); - }); - semaphore.acquire(); - RemoteClusterService remoteClusterService = service.getRemoteClusterService(); - Client client = remoteClusterService.getRemoteClusterClient(threadPool, "test"); - ClusterStateResponse clusterStateResponse = client.admin().cluster().prepareState().execute().get(); - assertNotNull(clusterStateResponse); - assertEquals("foo_bar_cluster", clusterStateResponse.getState().getClusterName().value()); - assertTrue(remoteClusterService.isRemoteNodeConnected("test", remoteNode)); - } finally { - semaphore.release(); - } + RemoteClusterConnection remoteClusterConnection = remoteClusterService.getRemoteClusterConnection("test"); + ConnectionManager connectionManager = remoteClusterConnection.getConnectionManager(); + Transport.Connection connection = connectionManager.getConnection(remoteNode); + PlainActionFuture closeFuture = PlainActionFuture.newFuture(); + connection.addCloseListener(closeFuture); + connectionManager.disconnectFromNode(remoteNode); + closeFuture.get(); + + Client client = remoteClusterService.getRemoteClusterClient(threadPool, "test"); + ClusterStateResponse clusterStateResponse = client.admin().cluster().prepareState().execute().get(); + assertNotNull(clusterStateResponse); + assertEquals("foo_bar_cluster", clusterStateResponse.getState().getClusterName().value()); + assertTrue(remoteClusterService.isRemoteNodeConnected("test", remoteNode)); } } }