Wait for blackholed connection before discovery (#44077)

Since #42636 we no longer treat connections specially when simulating a
blackholed connection. This means that at the end of the safety phase we may
have just started a connection attempt which will time out, but the default
timeout is 30 seconds, much longer than the 2 seconds we normally allow for
post-safety-phase discovery. This commit adds time for such a connection
attempt to time out.

It also fixes some spurious logging of `this` that now refers to an object with
an unhelpful `toString()` implementation introduced in #42636.

Fixes #44073
This commit is contained in:
David Turner 2019-07-09 10:59:22 +01:00
parent 748a10866d
commit 268971db03
2 changed files with 10 additions and 7 deletions

View File

@ -70,9 +70,10 @@ public class HandshakingTransportAddressConnector implements TransportAddressCon
@Override
public void connectToRemoteMasterNode(TransportAddress transportAddress, ActionListener<DiscoveryNode> listener) {
transportService.getThreadPool().generic().execute(new AbstractRunnable() {
private final AbstractRunnable thisConnectionAttempt = this;
@Override
protected void doRun() {
// TODO if transportService is already connected to this address then skip the handshaking
final DiscoveryNode targetNode = new DiscoveryNode("", transportAddress.toString(),
@ -80,13 +81,13 @@ public class HandshakingTransportAddressConnector implements TransportAddressCon
transportAddress.address().getHostString(), transportAddress.getAddress(), transportAddress, emptyMap(),
emptySet(), Version.CURRENT.minimumCompatibilityVersion());
logger.trace("[{}] opening probe connection", this);
logger.trace("[{}] opening probe connection", thisConnectionAttempt);
transportService.openConnection(targetNode,
ConnectionProfile.buildSingleChannelProfile(Type.REG, probeConnectTimeout, probeHandshakeTimeout,
TimeValue.MINUS_ONE, null), new ActionListener<Connection>() {
@Override
public void onResponse(Connection connection) {
logger.trace("[{}] opened probe connection", this);
logger.trace("[{}] opened probe connection", thisConnectionAttempt);
// use NotifyOnceListener to make sure the following line does not result in onFailure being called when
// the connection is closed in the onResponse handler
@ -96,7 +97,7 @@ public class HandshakingTransportAddressConnector implements TransportAddressCon
protected void innerOnResponse(DiscoveryNode remoteNode) {
try {
// success means (amongst other things) that the cluster names match
logger.trace("[{}] handshake successful: {}", this, remoteNode);
logger.trace("[{}] handshake successful: {}", thisConnectionAttempt, remoteNode);
IOUtils.closeWhileHandlingException(connection);
if (remoteNode.equals(transportService.getLocalNode())) {
@ -109,7 +110,7 @@ public class HandshakingTransportAddressConnector implements TransportAddressCon
transportService.connectToNode(remoteNode, new ActionListener<Void>() {
@Override
public void onResponse(Void ignored) {
logger.trace("[{}] full connection successful: {}", this, remoteNode);
logger.trace("[{}] full connection successful: {}", thisConnectionAttempt, remoteNode);
listener.onResponse(remoteNode);
}
@ -129,7 +130,7 @@ public class HandshakingTransportAddressConnector implements TransportAddressCon
// we opened a connection and successfully performed a low-level handshake, so we were definitely
// talking to an Elasticsearch node, but the high-level handshake failed indicating some kind of
// mismatched configurations (e.g. cluster name) that the user should address
logger.warn(new ParameterizedMessage("handshake failed for [{}]", this), e);
logger.warn(new ParameterizedMessage("handshake failed for [{}]", thisConnectionAttempt), e);
IOUtils.closeWhileHandlingException(connection);
listener.onFailure(e);
}

View File

@ -121,6 +121,7 @@ import static org.elasticsearch.discovery.PeerFinder.DISCOVERY_FIND_PEERS_INTERV
import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK;
import static org.elasticsearch.node.Node.NODE_NAME_SETTING;
import static org.elasticsearch.transport.TransportService.NOOP_TRANSPORT_INTERCEPTOR;
import static org.elasticsearch.transport.TransportSettings.CONNECT_TIMEOUT;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
@ -556,7 +557,8 @@ public class AbstractCoordinatorTestCase extends ESTestCase {
if (clusterNodes.stream().allMatch(ClusterNode::isNotUsefullyBootstrapped)) {
assertThat("setting initial configuration may fail with disconnected nodes", disconnectedNodes, empty());
assertThat("setting initial configuration may fail with blackholed nodes", blackholedNodes, empty());
runFor(defaultMillis(DISCOVERY_FIND_PEERS_INTERVAL_SETTING) * 2, "discovery prior to setting initial configuration");
runFor(defaultMillis(CONNECT_TIMEOUT) + // may be in a prior connection attempt which has been blackholed
defaultMillis(DISCOVERY_FIND_PEERS_INTERVAL_SETTING) * 2, "discovery prior to setting initial configuration");
final ClusterNode bootstrapNode = getAnyBootstrappableNode();
bootstrapNode.applyInitialConfiguration();
} else {