[Zen2] Add HandshakingTransportAddressConnector (#32643)

The `PeerFinder`, introduced in #32246, needs to be able to identify, and
connect to, a remote master node using only its `TransportAddress`. This can be
done by opening a single-channel connection to the address, performing a
handshake, and only then forming a full-blown connection to the node. This
change implements this logic.
This commit is contained in:
David Turner 2018-08-07 13:34:07 +01:00 committed by GitHub
parent 2176184db1
commit 289e34aeed
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 300 additions and 5 deletions

View File

@ -0,0 +1,113 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.discovery;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.discovery.PeerFinder.TransportAddressConnector;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.ConnectionProfile;
import org.elasticsearch.transport.Transport.Connection;
import org.elasticsearch.transport.TransportRequestOptions.Type;
import org.elasticsearch.transport.TransportService;
import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
public class HandshakingTransportAddressConnector extends AbstractComponent implements TransportAddressConnector {
// connection timeout for probes
public static final Setting<TimeValue> PROBE_CONNECT_TIMEOUT_SETTING =
Setting.timeSetting("discovery.probe.connect_timeout",
TimeValue.timeValueMillis(3000), TimeValue.timeValueMillis(1), Setting.Property.NodeScope);
// handshake timeout for probes
public static final Setting<TimeValue> PROBE_HANDSHAKE_TIMEOUT_SETTING =
Setting.timeSetting("discovery.probe.handshake_timeout",
TimeValue.timeValueMillis(1000), TimeValue.timeValueMillis(1), Setting.Property.NodeScope);
private final TransportService transportService;
private final TimeValue probeConnectTimeout;
private final TimeValue probeHandshakeTimeout;
public HandshakingTransportAddressConnector(Settings settings, TransportService transportService) {
super(settings);
this.transportService = transportService;
probeConnectTimeout = PROBE_CONNECT_TIMEOUT_SETTING.get(settings);
probeHandshakeTimeout = PROBE_HANDSHAKE_TIMEOUT_SETTING.get(settings);
}
@Override
public void connectToRemoteMasterNode(TransportAddress transportAddress, ActionListener<DiscoveryNode> listener) {
transportService.getThreadPool().generic().execute(new AbstractRunnable() {
@Override
protected void doRun() throws Exception {
// TODO if transportService is already connected to this address then skip the handshaking
final DiscoveryNode targetNode = new DiscoveryNode(transportAddress.toString(), transportAddress, emptyMap(),
emptySet(), Version.CURRENT.minimumCompatibilityVersion());
logger.trace("[{}] opening probe connection", this);
final Connection connection = transportService.openConnection(targetNode,
ConnectionProfile.buildSingleChannelProfile(Type.REG, probeConnectTimeout, probeHandshakeTimeout));
logger.trace("[{}] opened probe connection", this);
final DiscoveryNode remoteNode;
try {
remoteNode = transportService.handshake(connection, probeHandshakeTimeout.millis());
// success means (amongst other things) that the cluster names match
logger.trace("[{}] handshake successful: {}", this, remoteNode);
} finally {
IOUtils.closeWhileHandlingException(connection);
}
if (remoteNode.equals(transportService.getLocalNode())) {
// TODO cache this result for some time? forever?
listener.onFailure(new ConnectTransportException(remoteNode, "local node found"));
} else if (remoteNode.isMasterNode() == false) {
// TODO cache this result for some time?
listener.onFailure(new ConnectTransportException(remoteNode, "non-master-eligible node found"));
} else {
transportService.connectToNode(remoteNode);
logger.trace("[{}] full connection successful: {}", this, remoteNode);
listener.onResponse(remoteNode);
}
}
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
@Override
public String toString() {
return "connectToRemoteMasterNode[" + transportAddress + "]";
}
});
}
}

View File

@ -0,0 +1,179 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.discovery;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.transport.CapturingTransport;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.TransportService.HandshakeResponse;
import org.junit.After;
import org.junit.Before;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static org.elasticsearch.cluster.ClusterName.CLUSTER_NAME_SETTING;
import static org.elasticsearch.discovery.HandshakingTransportAddressConnector.PROBE_HANDSHAKE_TIMEOUT_SETTING;
import static org.elasticsearch.node.Node.NODE_NAME_SETTING;
import static org.hamcrest.Matchers.equalTo;
public class HandshakingTransportAddressConnectorTests extends ESTestCase {
private DiscoveryNode remoteNode;
private TransportService transportService;
private ThreadPool threadPool;
private String remoteClusterName;
private HandshakingTransportAddressConnector handshakingTransportAddressConnector;
private DiscoveryNode localNode;
private boolean dropHandshake;
@Before
public void startServices() {
localNode = new DiscoveryNode("local-node", buildNewFakeTransportAddress(), Version.CURRENT);
final Settings settings = Settings.builder()
.put(NODE_NAME_SETTING.getKey(), "node")
.put(CLUSTER_NAME_SETTING.getKey(), "local-cluster")
.build();
threadPool = new TestThreadPool("node", settings);
remoteNode = null;
remoteClusterName = null;
dropHandshake = false;
final CapturingTransport capturingTransport = new CapturingTransport() {
@Override
protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode node) {
super.onSendRequest(requestId, action, request, node);
assertThat(action, equalTo(TransportService.HANDSHAKE_ACTION_NAME));
assertEquals(remoteNode.getAddress(), node.getAddress());
assertNotEquals(remoteNode, node);
if (dropHandshake == false) {
handleResponse(requestId, new HandshakeResponse(remoteNode, new ClusterName(remoteClusterName), Version.CURRENT));
}
}
};
transportService = new TransportService(settings, capturingTransport, threadPool,
TransportService.NOOP_TRANSPORT_INTERCEPTOR, address -> localNode, null, emptySet());
transportService.start();
transportService.acceptIncomingRequests();
handshakingTransportAddressConnector = new HandshakingTransportAddressConnector(settings, transportService);
}
@After
public void stopServices() throws InterruptedException {
transportService.stop();
terminate(threadPool);
}
public void testConnectsToMasterNode() throws InterruptedException {
final CountDownLatch completionLatch = new CountDownLatch(1);
final SetOnce<DiscoveryNode> receivedNode = new SetOnce<>();
remoteNode = new DiscoveryNode("remote-node", buildNewFakeTransportAddress(), Version.CURRENT);
remoteClusterName = "local-cluster";
handshakingTransportAddressConnector.connectToRemoteMasterNode(remoteNode.getAddress(), new ActionListener<DiscoveryNode>() {
@Override
public void onResponse(DiscoveryNode discoveryNode) {
receivedNode.set(discoveryNode);
completionLatch.countDown();
}
@Override
public void onFailure(Exception e) {
throw new AssertionError(e);
}
});
assertTrue(completionLatch.await(30, TimeUnit.SECONDS));
assertEquals(remoteNode, receivedNode.get());
}
public void testDoesNotConnectToNonMasterNode() throws InterruptedException {
remoteNode = new DiscoveryNode("remote-node", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
remoteClusterName = "local-cluster";
FailureListener failureListener = new FailureListener();
handshakingTransportAddressConnector.connectToRemoteMasterNode(remoteNode.getAddress(), failureListener);
failureListener.assertFailure();
}
public void testDoesNotConnectToLocalNode() throws Exception {
remoteNode = localNode;
remoteClusterName = "local-cluster";
FailureListener failureListener = new FailureListener();
handshakingTransportAddressConnector.connectToRemoteMasterNode(remoteNode.getAddress(), failureListener);
failureListener.assertFailure();
}
public void testDoesNotConnectToDifferentCluster() throws InterruptedException {
remoteNode = new DiscoveryNode("remote-node", buildNewFakeTransportAddress(), Version.CURRENT);
remoteClusterName = "another-cluster";
FailureListener failureListener = new FailureListener();
handshakingTransportAddressConnector.connectToRemoteMasterNode(remoteNode.getAddress(), failureListener);
failureListener.assertFailure();
}
public void testHandshakeTimesOut() throws InterruptedException {
remoteNode = new DiscoveryNode("remote-node", buildNewFakeTransportAddress(), Version.CURRENT);
remoteClusterName = "local-cluster";
dropHandshake = true;
FailureListener failureListener = new FailureListener();
handshakingTransportAddressConnector.connectToRemoteMasterNode(remoteNode.getAddress(), failureListener);
Thread.sleep(PROBE_HANDSHAKE_TIMEOUT_SETTING.get(Settings.EMPTY).millis());
failureListener.assertFailure();
}
private class FailureListener implements ActionListener<DiscoveryNode> {
final CountDownLatch completionLatch = new CountDownLatch(1);
@Override
public void onResponse(DiscoveryNode discoveryNode) {
fail(discoveryNode.toString());
}
@Override
public void onFailure(Exception e) {
completionLatch.countDown();
}
void assertFailure() throws InterruptedException {
assertTrue(completionLatch.await(30, TimeUnit.SECONDS));
}
}
}

View File

@ -204,18 +204,21 @@ public class CapturingTransport implements Transport {
@Override
public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options)
throws IOException, TransportException {
requests.put(requestId, Tuple.tuple(node, action));
capturedRequests.add(new CapturedRequest(node, requestId, action, request));
throws TransportException {
onSendRequest(requestId, action, request, node);
}
@Override
public void close() throws IOException {
public void close() {
}
};
}
protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode node) {
requests.put(requestId, Tuple.tuple(node, action));
capturedRequests.add(new CapturedRequest(node, requestId, action, request));
}
@Override
public TransportStats getStats() {
throw new UnsupportedOperationException();