mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-03-25 09:28:27 +00:00
It is permitted for nodes to accept transport connections at addresses other than their publish address, which allows a good deal of flexibility when configuring discovery. However, it is not unusual for users to misconfigure nodes to pick a publish address which is inaccessible to other nodes. We see this happen a lot if the nodes are on different networks separated by a proxy, or if the nodes are running in Docker with the wrong kind of network config. In this case we offer no useful feedback to the user unless they enable TRACE-level logs. It's particularly tricky to diagnose because if we test connectivity between the nodes (using their discovery addresses) then all will appear well. This commit adds a WARN-level log if this kind of misconfiguration is detected: the probe connection has succeeded (to indicate that we are really talking to a healthy Elasticsearch node) but the followup connection attempt fails. It also tidies up some loose ends in `HandshakingTransportAddressConnector`, removing some TODOs that need not be completed, and registering its accidentally-unregistered timeout settings.
This commit is contained in:
parent
feb2a25761
commit
0152c40724
@ -69,6 +69,7 @@ import org.elasticsearch.common.util.concurrent.EsExecutors;
|
||||
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
||||
import org.elasticsearch.discovery.DiscoveryModule;
|
||||
import org.elasticsearch.discovery.DiscoverySettings;
|
||||
import org.elasticsearch.discovery.HandshakingTransportAddressConnector;
|
||||
import org.elasticsearch.discovery.PeerFinder;
|
||||
import org.elasticsearch.discovery.SeedHostsResolver;
|
||||
import org.elasticsearch.discovery.SettingsBasedSeedHostsProvider;
|
||||
@ -529,6 +530,8 @@ public final class ClusterSettings extends AbstractScopedSettings {
|
||||
ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING,
|
||||
ClusterBootstrapService.UNCONFIGURED_BOOTSTRAP_TIMEOUT_SETTING,
|
||||
LagDetector.CLUSTER_FOLLOWER_LAG_TIMEOUT_SETTING,
|
||||
HandshakingTransportAddressConnector.PROBE_CONNECT_TIMEOUT_SETTING,
|
||||
HandshakingTransportAddressConnector.PROBE_HANDSHAKE_TIMEOUT_SETTING,
|
||||
DiscoveryUpgradeService.BWC_PING_TIMEOUT_SETTING,
|
||||
DiscoveryUpgradeService.ENABLE_UNSAFE_BOOTSTRAPPING_ON_UPGRADE_SETTING)));
|
||||
|
||||
|
@ -74,7 +74,8 @@ public class HandshakingTransportAddressConnector implements TransportAddressCon
|
||||
|
||||
@Override
|
||||
protected void doRun() {
|
||||
// TODO if transportService is already connected to this address then skip the handshaking
|
||||
// We could skip this if the transportService were already connected to the given address, but the savings would be minimal
|
||||
// so we open a new connection anyway.
|
||||
|
||||
final DiscoveryNode targetNode = new DiscoveryNode("", transportAddress.toString(),
|
||||
UUIDs.randomBase64UUID(Randomness.get()), // generated deterministically for reproducible tests
|
||||
@ -101,21 +102,28 @@ public class HandshakingTransportAddressConnector implements TransportAddressCon
|
||||
IOUtils.closeWhileHandlingException(connection);
|
||||
|
||||
if (remoteNode.equals(transportService.getLocalNode())) {
|
||||
// TODO cache this result for some time? forever?
|
||||
listener.onFailure(new ConnectTransportException(remoteNode, "local node found"));
|
||||
} else if (remoteNode.isMasterNode() == false) {
|
||||
// TODO cache this result for some time?
|
||||
listener.onFailure(new ConnectTransportException(remoteNode, "non-master-eligible node found"));
|
||||
} else {
|
||||
transportService.connectToNode(remoteNode, new ActionListener<Void>() {
|
||||
@Override
|
||||
public void onResponse(Void ignored) {
|
||||
logger.trace("[{}] full connection successful: {}", thisConnectionAttempt, remoteNode);
|
||||
logger.trace("[{}] completed full connection with [{}]",
|
||||
thisConnectionAttempt, remoteNode);
|
||||
listener.onResponse(remoteNode);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
// we opened a connection and successfully performed a handshake, so we're definitely
|
||||
// talking to a master-eligible node with a matching cluster name and a good version,
|
||||
// but the attempt to open a full connection to its publish address failed; a common
|
||||
// reason is that the remote node is listening on 0.0.0.0 but has made an inappropriate
|
||||
// choice for its publish address.
|
||||
logger.warn(new ParameterizedMessage(
|
||||
"[{}] completed handshake with [{}] but followup connection failed",
|
||||
thisConnectionAttempt, remoteNode), e);
|
||||
listener.onFailure(e);
|
||||
}
|
||||
});
|
||||
|
@ -19,16 +19,27 @@
|
||||
|
||||
package org.elasticsearch.discovery;
|
||||
|
||||
import org.apache.logging.log4j.Level;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.lucene.util.SetOnce;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.logging.Loggers;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.transport.TransportAddress;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.test.MockLogAppender;
|
||||
import org.elasticsearch.test.junit.annotations.TestLogging;
|
||||
import org.elasticsearch.test.transport.MockTransport;
|
||||
import org.elasticsearch.threadpool.TestThreadPool;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.ConnectTransportException;
|
||||
import org.elasticsearch.transport.TransportException;
|
||||
import org.elasticsearch.transport.TransportRequest;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.transport.TransportService.HandshakeResponse;
|
||||
@ -44,10 +55,13 @@ import static org.elasticsearch.cluster.ClusterName.CLUSTER_NAME_SETTING;
|
||||
import static org.elasticsearch.discovery.HandshakingTransportAddressConnector.PROBE_HANDSHAKE_TIMEOUT_SETTING;
|
||||
import static org.elasticsearch.node.Node.NODE_NAME_SETTING;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.notNullValue;
|
||||
import static org.hamcrest.Matchers.oneOf;
|
||||
|
||||
public class HandshakingTransportAddressConnectorTests extends ESTestCase {
|
||||
|
||||
private DiscoveryNode remoteNode;
|
||||
private TransportAddress discoveryAddress;
|
||||
private TransportService transportService;
|
||||
private ThreadPool threadPool;
|
||||
private String remoteClusterName;
|
||||
@ -55,6 +69,8 @@ public class HandshakingTransportAddressConnectorTests extends ESTestCase {
|
||||
private DiscoveryNode localNode;
|
||||
|
||||
private boolean dropHandshake;
|
||||
@Nullable // unless we want the full connection to fail
|
||||
private TransportException fullConnectionFailure;
|
||||
|
||||
@Before
|
||||
public void startServices() {
|
||||
@ -66,17 +82,24 @@ public class HandshakingTransportAddressConnectorTests extends ESTestCase {
|
||||
threadPool = new TestThreadPool("node", settings);
|
||||
|
||||
remoteNode = null;
|
||||
discoveryAddress = null;
|
||||
remoteClusterName = null;
|
||||
dropHandshake = false;
|
||||
fullConnectionFailure = null;
|
||||
|
||||
final MockTransport mockTransport = new MockTransport() {
|
||||
@Override
|
||||
protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode node) {
|
||||
super.onSendRequest(requestId, action, request, node);
|
||||
assertThat(action, equalTo(TransportService.HANDSHAKE_ACTION_NAME));
|
||||
assertEquals(remoteNode.getAddress(), node.getAddress());
|
||||
assertThat(discoveryAddress, notNullValue());
|
||||
assertThat(node.getAddress(), oneOf(discoveryAddress, remoteNode.getAddress()));
|
||||
if (dropHandshake == false) {
|
||||
handleResponse(requestId, new HandshakeResponse(remoteNode, new ClusterName(remoteClusterName), Version.CURRENT));
|
||||
if (fullConnectionFailure != null && node.getAddress().equals(remoteNode.getAddress())) {
|
||||
handleError(requestId, fullConnectionFailure);
|
||||
} else {
|
||||
handleResponse(requestId, new HandshakeResponse(remoteNode, new ClusterName(remoteClusterName), Version.CURRENT));
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
@ -91,7 +114,7 @@ public class HandshakingTransportAddressConnectorTests extends ESTestCase {
|
||||
}
|
||||
|
||||
@After
|
||||
public void stopServices() throws InterruptedException {
|
||||
public void stopServices() {
|
||||
transportService.stop();
|
||||
terminate(threadPool);
|
||||
}
|
||||
@ -102,8 +125,9 @@ public class HandshakingTransportAddressConnectorTests extends ESTestCase {
|
||||
|
||||
remoteNode = new DiscoveryNode("remote-node", buildNewFakeTransportAddress(), Version.CURRENT);
|
||||
remoteClusterName = "local-cluster";
|
||||
discoveryAddress = getDiscoveryAddress();
|
||||
|
||||
handshakingTransportAddressConnector.connectToRemoteMasterNode(remoteNode.getAddress(), new ActionListener<DiscoveryNode>() {
|
||||
handshakingTransportAddressConnector.connectToRemoteMasterNode(discoveryAddress, new ActionListener<DiscoveryNode>() {
|
||||
@Override
|
||||
public void onResponse(DiscoveryNode discoveryNode) {
|
||||
receivedNode.set(discoveryNode);
|
||||
@ -120,44 +144,84 @@ public class HandshakingTransportAddressConnectorTests extends ESTestCase {
|
||||
assertEquals(remoteNode, receivedNode.get());
|
||||
}
|
||||
|
||||
@TestLogging(reason="ensure logging happens", value="org.elasticsearch.discovery.HandshakingTransportAddressConnector:INFO")
|
||||
public void testLogsFullConnectionFailureAfterSuccessfulHandshake() throws Exception {
|
||||
|
||||
remoteNode = new DiscoveryNode("remote-node", buildNewFakeTransportAddress(), Version.CURRENT);
|
||||
remoteClusterName = "local-cluster";
|
||||
discoveryAddress = buildNewFakeTransportAddress();
|
||||
|
||||
fullConnectionFailure = new ConnectTransportException(remoteNode, "simulated", new ElasticsearchException("root cause"));
|
||||
|
||||
FailureListener failureListener = new FailureListener();
|
||||
|
||||
MockLogAppender mockAppender = new MockLogAppender();
|
||||
mockAppender.start();
|
||||
mockAppender.addExpectation(
|
||||
new MockLogAppender.SeenEventExpectation(
|
||||
"message",
|
||||
HandshakingTransportAddressConnector.class.getCanonicalName(),
|
||||
Level.WARN,
|
||||
"*completed handshake with [*] but followup connection failed*"));
|
||||
Logger targetLogger = LogManager.getLogger(HandshakingTransportAddressConnector.class);
|
||||
Loggers.addAppender(targetLogger, mockAppender);
|
||||
|
||||
try {
|
||||
handshakingTransportAddressConnector.connectToRemoteMasterNode(discoveryAddress, failureListener);
|
||||
failureListener.assertFailure();
|
||||
mockAppender.assertAllExpectationsMatched();
|
||||
} finally {
|
||||
Loggers.removeAppender(targetLogger, mockAppender);
|
||||
mockAppender.stop();
|
||||
}
|
||||
}
|
||||
|
||||
public void testDoesNotConnectToNonMasterNode() throws InterruptedException {
|
||||
remoteNode = new DiscoveryNode("remote-node", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
|
||||
discoveryAddress = getDiscoveryAddress();
|
||||
remoteClusterName = "local-cluster";
|
||||
|
||||
FailureListener failureListener = new FailureListener();
|
||||
handshakingTransportAddressConnector.connectToRemoteMasterNode(remoteNode.getAddress(), failureListener);
|
||||
handshakingTransportAddressConnector.connectToRemoteMasterNode(discoveryAddress, failureListener);
|
||||
failureListener.assertFailure();
|
||||
}
|
||||
|
||||
public void testDoesNotConnectToLocalNode() throws Exception {
|
||||
remoteNode = localNode;
|
||||
discoveryAddress = getDiscoveryAddress();
|
||||
remoteClusterName = "local-cluster";
|
||||
|
||||
FailureListener failureListener = new FailureListener();
|
||||
handshakingTransportAddressConnector.connectToRemoteMasterNode(remoteNode.getAddress(), failureListener);
|
||||
handshakingTransportAddressConnector.connectToRemoteMasterNode(discoveryAddress, failureListener);
|
||||
failureListener.assertFailure();
|
||||
}
|
||||
|
||||
public void testDoesNotConnectToDifferentCluster() throws InterruptedException {
|
||||
remoteNode = new DiscoveryNode("remote-node", buildNewFakeTransportAddress(), Version.CURRENT);
|
||||
discoveryAddress = getDiscoveryAddress();
|
||||
remoteClusterName = "another-cluster";
|
||||
|
||||
FailureListener failureListener = new FailureListener();
|
||||
handshakingTransportAddressConnector.connectToRemoteMasterNode(remoteNode.getAddress(), failureListener);
|
||||
handshakingTransportAddressConnector.connectToRemoteMasterNode(discoveryAddress, failureListener);
|
||||
failureListener.assertFailure();
|
||||
}
|
||||
|
||||
public void testHandshakeTimesOut() throws InterruptedException {
|
||||
remoteNode = new DiscoveryNode("remote-node", buildNewFakeTransportAddress(), Version.CURRENT);
|
||||
discoveryAddress = getDiscoveryAddress();
|
||||
remoteClusterName = "local-cluster";
|
||||
dropHandshake = true;
|
||||
|
||||
FailureListener failureListener = new FailureListener();
|
||||
handshakingTransportAddressConnector.connectToRemoteMasterNode(remoteNode.getAddress(), failureListener);
|
||||
handshakingTransportAddressConnector.connectToRemoteMasterNode(discoveryAddress, failureListener);
|
||||
Thread.sleep(PROBE_HANDSHAKE_TIMEOUT_SETTING.get(Settings.EMPTY).millis());
|
||||
failureListener.assertFailure();
|
||||
}
|
||||
|
||||
private TransportAddress getDiscoveryAddress() {
|
||||
return randomBoolean() ? remoteNode.getAddress() : buildNewFakeTransportAddress();
|
||||
}
|
||||
|
||||
private class FailureListener implements ActionListener<DiscoveryNode> {
|
||||
final CountDownLatch completionLatch = new CountDownLatch(1);
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user