Improve logging around SniffConnectionStrategy (#56378)

Currently, the logging around the SniffConnectionStrategy is limited.
The log messages are inconsistent and sometimes wrong. This commit
cleans up these log message to describe when connections are happening
and what failed if a step fails.

Additionally, this commit enables TRACE logging for a problematic test
(testEnsureWeReconnect).
This commit is contained in:
Tim Brooks 2020-05-07 13:11:56 -06:00 committed by GitHub
parent 9d076364d7
commit b84d1e2577
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 44 additions and 41 deletions

View File

@ -296,18 +296,17 @@ public class SniffConnectionStrategy extends RemoteConnectionStrategy {
// ISE if we fail the handshake with an version incompatible node // ISE if we fail the handshake with an version incompatible node
if (seedNodes.hasNext()) { if (seedNodes.hasNext()) {
logger.debug(() -> new ParameterizedMessage( logger.debug(() -> new ParameterizedMessage(
"fetching nodes from external cluster [{}] failed moving to next node", clusterAlias), e); "fetching nodes from external cluster [{}] failed moving to next seed node", clusterAlias), e);
collectRemoteNodes(seedNodes, listener); collectRemoteNodes(seedNodes, listener);
return; return;
} }
} }
logger.warn(() -> new ParameterizedMessage("fetching nodes from external cluster [{}] failed", clusterAlias), e); logger.warn(new ParameterizedMessage("fetching nodes from external cluster [{}] failed", clusterAlias), e);
listener.onFailure(e); listener.onFailure(e);
}; };
final DiscoveryNode seedNode = seedNodes.next().get(); final DiscoveryNode seedNode = seedNodes.next().get();
logger.debug("[{}] opening connection to seed node: [{}] proxy address: [{}]", clusterAlias, seedNode, logger.trace("[{}] opening transient connection to seed node: [{}]", clusterAlias, seedNode);
proxyAddress);
final StepListener<Transport.Connection> openConnectionStep = new StepListener<>(); final StepListener<Transport.Connection> openConnectionStep = new StepListener<>();
try { try {
connectionManager.openConnection(seedNode, null, openConnectionStep); connectionManager.openConnection(seedNode, null, openConnectionStep);
@ -324,17 +323,21 @@ public class SniffConnectionStrategy extends RemoteConnectionStrategy {
final StepListener<Void> fullConnectionStep = new StepListener<>(); final StepListener<Void> fullConnectionStep = new StepListener<>();
handshakeStep.whenComplete(handshakeResponse -> { handshakeStep.whenComplete(handshakeResponse -> {
final DiscoveryNode handshakeNode = maybeAddProxyAddress(proxyAddress, handshakeResponse.getDiscoveryNode()); final DiscoveryNode handshakeNode = handshakeResponse.getDiscoveryNode();
if (nodePredicate.test(handshakeNode) && shouldOpenMoreConnections()) { if (nodePredicate.test(handshakeNode) && shouldOpenMoreConnections()) {
connectionManager.connectToNode(handshakeNode, null, logger.trace("[{}] opening managed connection to seed node: [{}] proxy address: [{}]", clusterAlias, handshakeNode,
transportService.connectionValidator(handshakeNode), fullConnectionStep); proxyAddress);
final DiscoveryNode handshakeNodeWithProxy = maybeAddProxyAddress(proxyAddress, handshakeNode);
connectionManager.connectToNode(handshakeNodeWithProxy, null,
transportService.connectionValidator(handshakeNodeWithProxy), fullConnectionStep);
} else { } else {
fullConnectionStep.onResponse(null); fullConnectionStep.onResponse(null);
} }
}, e -> { }, e -> {
final Transport.Connection connection = openConnectionStep.result(); final Transport.Connection connection = openConnectionStep.result();
logger.warn(new ParameterizedMessage("failed to connect to seed node [{}]", connection.getNode()), e); final DiscoveryNode node = connection.getNode();
logger.debug(() -> new ParameterizedMessage("[{}] failed to handshake with seed node: [{}]", clusterAlias, node), e);
IOUtils.closeWhileHandlingException(connection); IOUtils.closeWhileHandlingException(connection);
onFailure.accept(e); onFailure.accept(e);
}); });
@ -366,6 +369,10 @@ public class SniffConnectionStrategy extends RemoteConnectionStrategy {
responseHandler); responseHandler);
} }
}, e -> { }, e -> {
final Transport.Connection connection = openConnectionStep.result();
final DiscoveryNode node = connection.getNode();
logger.debug(() -> new ParameterizedMessage(
"[{}] failed to open managed connection to seed node: [{}]", clusterAlias, node), e);
IOUtils.closeWhileHandlingException(openConnectionStep.result()); IOUtils.closeWhileHandlingException(openConnectionStep.result());
onFailure.accept(e); onFailure.accept(e);
}); });
@ -400,9 +407,11 @@ public class SniffConnectionStrategy extends RemoteConnectionStrategy {
private void handleNodes(Iterator<DiscoveryNode> nodesIter) { private void handleNodes(Iterator<DiscoveryNode> nodesIter) {
while (nodesIter.hasNext()) { while (nodesIter.hasNext()) {
final DiscoveryNode node = maybeAddProxyAddress(proxyAddress, nodesIter.next()); final DiscoveryNode node = nodesIter.next();
if (nodePredicate.test(node) && shouldOpenMoreConnections()) { if (nodePredicate.test(node) && shouldOpenMoreConnections()) {
connectionManager.connectToNode(node, null, logger.trace("[{}] opening managed connection to node: [{}] proxy address: [{}]", clusterAlias, node, proxyAddress);
final DiscoveryNode nodeWithProxy = maybeAddProxyAddress(proxyAddress, node);
connectionManager.connectToNode(nodeWithProxy, null,
transportService.connectionValidator(node), new ActionListener<Void>() { transportService.connectionValidator(node), new ActionListener<Void>() {
@Override @Override
public void onResponse(Void aVoid) { public void onResponse(Void aVoid) {
@ -414,11 +423,12 @@ public class SniffConnectionStrategy extends RemoteConnectionStrategy {
if (e instanceof ConnectTransportException || e instanceof IllegalStateException) { if (e instanceof ConnectTransportException || e instanceof IllegalStateException) {
// ISE if we fail the handshake with an version incompatible node // ISE if we fail the handshake with an version incompatible node
// fair enough we can't connect just move on // fair enough we can't connect just move on
logger.debug(() -> new ParameterizedMessage("failed to connect to node {}", node), e); logger.debug(() -> new ParameterizedMessage(
"[{}] failed to open managed connection to node [{}]", clusterAlias, node), e);
handleNodes(nodesIter); handleNodes(nodesIter);
} else { } else {
logger.warn(() -> logger.warn(new ParameterizedMessage(
new ParameterizedMessage("fetching nodes from external cluster {} failed", clusterAlias), e); "[{}] failed to open managed connection to node [{}]", clusterAlias, node), e);
IOUtils.closeWhileHandlingException(connection); IOUtils.closeWhileHandlingException(connection);
collectRemoteNodes(seedNodes, listener); collectRemoteNodes(seedNodes, listener);
} }
@ -441,7 +451,7 @@ public class SniffConnectionStrategy extends RemoteConnectionStrategy {
@Override @Override
public void handleException(TransportException exp) { public void handleException(TransportException exp) {
logger.warn(() -> new ParameterizedMessage("fetching nodes from external cluster {} failed", clusterAlias), exp); logger.warn(new ParameterizedMessage("fetching nodes from external cluster {} failed", clusterAlias), exp);
try { try {
IOUtils.closeWhileHandlingException(connection); IOUtils.closeWhileHandlingException(connection);
} finally { } finally {

View File

@ -20,18 +20,19 @@ package org.elasticsearch.transport;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.node.Node; import org.elasticsearch.node.Node;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import java.util.Collections; import java.util.Collections;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import static org.elasticsearch.transport.RemoteClusterConnectionTests.startTransport; import static org.elasticsearch.transport.RemoteClusterConnectionTests.startTransport;
@ -76,6 +77,9 @@ public class RemoteClusterClientTests extends ESTestCase {
} }
} }
@TestLogging(
value = "org.elasticsearch.transport.SniffConnectionStrategy:TRACE,org.elasticsearch.transport.ClusterConnectionManager:TRACE",
reason = "debug intermittent test failure")
public void testEnsureWeReconnect() throws Exception { public void testEnsureWeReconnect() throws Exception {
Settings remoteSettings = Settings.builder().put(ClusterName.CLUSTER_NAME_SETTING.getKey(), "foo_bar_cluster").build(); Settings remoteSettings = Settings.builder().put(ClusterName.CLUSTER_NAME_SETTING.getKey(), "foo_bar_cluster").build();
try (MockTransportService remoteTransport = startTransport("remote_node", Collections.emptyList(), Version.CURRENT, threadPool, try (MockTransportService remoteTransport = startTransport("remote_node", Collections.emptyList(), Version.CURRENT, threadPool,
@ -86,37 +90,26 @@ public class RemoteClusterClientTests extends ESTestCase {
.put("cluster.remote.test.seeds", .put("cluster.remote.test.seeds",
remoteNode.getAddress().getAddress() + ":" + remoteNode.getAddress().getPort()).build(); remoteNode.getAddress().getAddress() + ":" + remoteNode.getAddress().getPort()).build();
try (MockTransportService service = MockTransportService.createNewService(localSettings, Version.CURRENT, threadPool, null)) { try (MockTransportService service = MockTransportService.createNewService(localSettings, Version.CURRENT, threadPool, null)) {
Semaphore semaphore = new Semaphore(1);
service.start(); service.start();
service.getRemoteClusterService().getConnections().forEach(con -> {
con.getConnectionManager().addListener(new TransportConnectionListener() {
@Override
public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connection) {
if (remoteNode.equals(node)) {
semaphore.release();
}
}
});
});
// this test is not perfect since we might reconnect concurrently but it will fail most of the time if we don't have // this test is not perfect since we might reconnect concurrently but it will fail most of the time if we don't have
// the right calls in place in the RemoteAwareClient // the right calls in place in the RemoteAwareClient
service.acceptIncomingRequests(); service.acceptIncomingRequests();
RemoteClusterService remoteClusterService = service.getRemoteClusterService();
assertBusy(() -> assertTrue(remoteClusterService.isRemoteNodeConnected("test", remoteNode)));
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
semaphore.acquire(); RemoteClusterConnection remoteClusterConnection = remoteClusterService.getRemoteClusterConnection("test");
try { ConnectionManager connectionManager = remoteClusterConnection.getConnectionManager();
service.getRemoteClusterService().getConnections().forEach(con -> { Transport.Connection connection = connectionManager.getConnection(remoteNode);
con.getConnectionManager().disconnectFromNode(remoteNode); PlainActionFuture<Void> closeFuture = PlainActionFuture.newFuture();
}); connection.addCloseListener(closeFuture);
semaphore.acquire(); connectionManager.disconnectFromNode(remoteNode);
RemoteClusterService remoteClusterService = service.getRemoteClusterService(); closeFuture.get();
Client client = remoteClusterService.getRemoteClusterClient(threadPool, "test");
ClusterStateResponse clusterStateResponse = client.admin().cluster().prepareState().execute().get(); Client client = remoteClusterService.getRemoteClusterClient(threadPool, "test");
assertNotNull(clusterStateResponse); ClusterStateResponse clusterStateResponse = client.admin().cluster().prepareState().execute().get();
assertEquals("foo_bar_cluster", clusterStateResponse.getState().getClusterName().value()); assertNotNull(clusterStateResponse);
assertTrue(remoteClusterService.isRemoteNodeConnected("test", remoteNode)); assertEquals("foo_bar_cluster", clusterStateResponse.getState().getClusterName().value());
} finally { assertTrue(remoteClusterService.isRemoteNodeConnected("test", remoteNode));
semaphore.release();
}
} }
} }
} }