Send cluster name and discovery node in handshake (#48916)

This commits sends the cluster name and discovery naode in the transport
level handshake response. This will allow us to stop sending the
transport service level handshake request in the 8.0-8.x release cycle.
It is necessary to start sending this in 7.x so that 8.0 is guaranteed
to be communicating with a version that sends the required information.
This commit is contained in:
Tim Brooks 2019-11-11 18:42:02 -05:00 committed by GitHub
parent c320b499a0
commit 0645ee88e2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 135 additions and 33 deletions

View File

@ -27,6 +27,7 @@ import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ThreadedActionListener;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.Strings;
@ -151,7 +152,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
BigArrays bigArrays = new BigArrays(pageCacheRecycler, circuitBreakerService, CircuitBreaker.IN_FLIGHT_REQUESTS);
this.outboundHandler = new OutboundHandler(nodeName, version, features, threadPool, bigArrays);
this.handshaker = new TransportHandshaker(version, threadPool,
this.handshaker = new TransportHandshaker(ClusterName.CLUSTER_NAME_SETTING.get(settings), version, threadPool,
(node, channel, requestId, v) -> outboundHandler.sendRequest(node, channel, requestId,
TransportHandshaker.HANDSHAKE_ACTION_NAME, new TransportHandshaker.HandshakeRequest(version),
TransportRequestOptions.EMPTY, v, false, true),
@ -167,6 +168,11 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
protected void doStart() {
}
@Override
public void setLocalNode(DiscoveryNode localNode) {
handshaker.setLocalNode(localNode);
}
@Override
public synchronized void setMessageListener(TransportMessageListener listener) {
outboundHandler.setMessageListener(listener);

View File

@ -52,6 +52,8 @@ public interface Transport extends LifecycleComponent {
void setMessageListener(TransportMessageListener listener);
void setLocalNode(DiscoveryNode localNode);
/**
* The address the transport is bound on.
*/

View File

@ -20,6 +20,7 @@ package org.elasticsearch.transport;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
@ -46,19 +47,26 @@ final class TransportHandshaker {
private final ConcurrentMap<Long, HandshakeResponseHandler> pendingHandshakes = new ConcurrentHashMap<>();
private final CounterMetric numHandshakes = new CounterMetric();
private final ClusterName clusterName;
private final Version version;
private final ThreadPool threadPool;
private final HandshakeRequestSender handshakeRequestSender;
private final HandshakeResponseSender handshakeResponseSender;
private volatile DiscoveryNode localNode;
TransportHandshaker(Version version, ThreadPool threadPool, HandshakeRequestSender handshakeRequestSender,
TransportHandshaker(ClusterName clusterName, Version version, ThreadPool threadPool, HandshakeRequestSender handshakeRequestSender,
HandshakeResponseSender handshakeResponseSender) {
this.clusterName = clusterName;
this.version = version;
this.threadPool = threadPool;
this.handshakeRequestSender = handshakeRequestSender;
this.handshakeResponseSender = handshakeResponseSender;
}
void setLocalNode(DiscoveryNode localNode) {
this.localNode = localNode;
}
void sendHandshake(long requestId, DiscoveryNode node, TcpChannel channel, TimeValue timeout, ActionListener<Version> listener) {
numHandshakes.inc();
final HandshakeResponseHandler handler = new HandshakeResponseHandler(requestId, version, listener);
@ -89,6 +97,9 @@ final class TransportHandshaker {
}
void handleHandshake(Version version, Set<String> features, TcpChannel channel, long requestId, StreamInput stream) throws IOException {
// The TransportService blocks incoming requests until this has been set.
assert localNode != null : "Local node must be set before handshake is handled";
// Must read the handshake request to exhaust the stream
HandshakeRequest handshakeRequest = new HandshakeRequest(stream);
final int nextByte = stream.read();
@ -96,7 +107,7 @@ final class TransportHandshaker {
throw new IllegalStateException("Handshake request not fully read for requestId [" + requestId + "], action ["
+ TransportHandshaker.HANDSHAKE_ACTION_NAME + "], available [" + stream.available() + "]; resetting");
}
HandshakeResponse response = new HandshakeResponse(this.version);
HandshakeResponse response = new HandshakeResponse(handshakeRequest.version, this.version, this.clusterName, this.localNode);
handshakeResponseSender.sendResponse(version, features, channel, response, requestId);
}
@ -127,13 +138,13 @@ final class TransportHandshaker {
@Override
public HandshakeResponse read(StreamInput in) throws IOException {
return new HandshakeResponse(in);
return new HandshakeResponse(this.currentVersion, in);
}
@Override
public void handleResponse(HandshakeResponse response) {
if (isDone.compareAndSet(false, true)) {
Version version = response.responseVersion;
Version version = response.version;
if (currentVersion.isCompatible(version) == false) {
listener.onFailure(new IllegalStateException("Received message from unsupported version: [" + version
+ "] minimal compatible version is: [" + currentVersion.minimumCompatibilityVersion() + "]"));
@ -201,25 +212,58 @@ final class TransportHandshaker {
static final class HandshakeResponse extends TransportResponse {
private final Version responseVersion;
private final Version requestVersion;
private final Version version;
private final ClusterName clusterName;
private final DiscoveryNode discoveryNode;
HandshakeResponse(Version responseVersion) {
this.responseVersion = responseVersion;
HandshakeResponse(Version requestVersion, Version responseVersion, ClusterName clusterName, DiscoveryNode discoveryNode) {
this.requestVersion = requestVersion;
this.version = responseVersion;
this.clusterName = clusterName;
this.discoveryNode = discoveryNode;
}
private HandshakeResponse(StreamInput in) throws IOException {
private HandshakeResponse(Version requestVersion, StreamInput in) throws IOException {
super(in);
responseVersion = Version.readVersion(in);
this.requestVersion = requestVersion;
version = Version.readVersion(in);
// During the handshake process, nodes set their stream version to the minimum compatibility
// version they support. When deserializing the response, we use the version the other node
// told us that it actually is in the handshake response (`version`).
if (requestVersion.onOrAfter(Version.V_7_6_0) && version.onOrAfter(Version.V_7_6_0)) {
clusterName = new ClusterName(in);
discoveryNode = new DiscoveryNode(in);
} else {
clusterName = null;
discoveryNode = null;
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
assert responseVersion != null;
Version.writeVersion(responseVersion, out);
assert version != null;
Version.writeVersion(version, out);
// During the handshake process, nodes set their stream version to the minimum compatibility
// version they support. When deciding what response to send, we use the version the other node
// told us that it actually is in the handshake request (`requestVersion`). If it did not tell
// us a `requestVersion`, it is at least a pre-7.6 node.
if (requestVersion != null && requestVersion.onOrAfter(Version.V_7_6_0) && version.onOrAfter(Version.V_7_6_0)) {
clusterName.writeTo(out);
discoveryNode.writeTo(out);
}
}
Version getResponseVersion() {
return responseVersion;
Version getVersion() {
return version;
}
ClusterName getClusterName() {
return clusterName;
}
DiscoveryNode getDiscoveryNode() {
return discoveryNode;
}
}

View File

@ -235,6 +235,7 @@ public class TransportService extends AbstractLifecycleComponent implements Tran
}
}
localNode = localNodeFactory.apply(transport.boundAddress());
transport.setLocalNode(localNode);
if (connectToRemoteCluster) {
// here we start to connect to the remote clusters

View File

@ -229,4 +229,9 @@ abstract class FailAndRetryMockTransport<Response extends TransportResponse> imp
public void setMessageListener(TransportMessageListener listener) {
this.listener = listener;
}
@Override
public void setLocalNode(DiscoveryNode localNode) {
}
}

View File

@ -415,6 +415,10 @@ public class NodeConnectionsServiceTests extends ESTestCase {
public void setMessageListener(TransportMessageListener listener) {
}
@Override
public void setLocalNode(DiscoveryNode localNode) {
}
@Override
public BoundTransportAddress boundAddress() {
return null;

View File

@ -21,6 +21,7 @@ package org.elasticsearch.transport;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
@ -58,9 +59,8 @@ public class InboundHandlerTests extends ESTestCase {
channel = new FakeTcpChannel(randomBoolean(), buildNewFakeTransportAddress().address(), buildNewFakeTransportAddress().address());
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList());
InboundMessage.Reader reader = new InboundMessage.Reader(version, namedWriteableRegistry, threadPool.getThreadContext());
TransportHandshaker handshaker = new TransportHandshaker(version, threadPool, (n, c, r, v) -> {
}, (v, f, c, r, r_id) -> {
});
TransportHandshaker handshaker = new TransportHandshaker(new ClusterName("cluster-name"), version, threadPool, (n, c, r, v) -> {
}, (v, f, c, r, r_id) -> {});
TransportKeepAlive keepAlive = new TransportKeepAlive(threadPool, TcpChannel::sendMessage);
OutboundHandler outboundHandler =
new OutboundHandler("node", version, new String[0], threadPool, BigArrays.NON_RECYCLING_INSTANCE);

View File

@ -20,9 +20,12 @@ package org.elasticsearch.transport;
import org.elasticsearch.Version;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.test.ESTestCase;
@ -30,6 +33,7 @@ import org.elasticsearch.threadpool.TestThreadPool;
import org.mockito.ArgumentCaptor;
import java.io.IOException;
import java.net.InetAddress;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
@ -42,23 +46,29 @@ import static org.mockito.Mockito.verify;
public class TransportHandshakerTests extends ESTestCase {
private TransportHandshaker handshaker;
private DiscoveryNode node;
private DiscoveryNode remoteNode;
private TcpChannel channel;
private TestThreadPool threadPool;
private TransportHandshaker.HandshakeRequestSender requestSender;
private TransportHandshaker.HandshakeResponseSender responseSender;
private ClusterName clusterName;
private DiscoveryNode localNode;
@Override
@SuppressForbidden(reason = "Allow accessing localhost")
public void setUp() throws Exception {
super.setUp();
String nodeId = "node-id";
String nodeId = "remote-node-id";
channel = mock(TcpChannel.class);
requestSender = mock(TransportHandshaker.HandshakeRequestSender.class);
responseSender = mock(TransportHandshaker.HandshakeResponseSender.class);
node = new DiscoveryNode(nodeId, nodeId, nodeId, "host", "host_address", buildNewFakeTransportAddress(), Collections.emptyMap(),
Collections.emptySet(), Version.CURRENT);
remoteNode = new DiscoveryNode(nodeId, nodeId, nodeId, "host", "host_address", buildNewFakeTransportAddress(),
Collections.emptyMap(), Collections.emptySet(), Version.CURRENT);
threadPool = new TestThreadPool("thread-poll");
handshaker = new TransportHandshaker(Version.CURRENT, threadPool, requestSender, responseSender);
clusterName = new ClusterName("cluster");
localNode = new DiscoveryNode("local-node-id", new TransportAddress(InetAddress.getLocalHost(), 0), Version.CURRENT);
handshaker = new TransportHandshaker(clusterName, Version.CURRENT, threadPool, requestSender, responseSender);
handshaker.setLocalNode(localNode);
}
@Override
@ -70,9 +80,9 @@ public class TransportHandshakerTests extends ESTestCase {
public void testHandshakeRequestAndResponse() throws IOException {
PlainActionFuture<Version> versionFuture = PlainActionFuture.newFuture();
long reqId = randomLongBetween(1, 10);
handshaker.sendHandshake(reqId, node, channel, new TimeValue(30, TimeUnit.SECONDS), versionFuture);
handshaker.sendHandshake(reqId, remoteNode, channel, new TimeValue(30, TimeUnit.SECONDS), versionFuture);
verify(requestSender).sendRequest(node, channel, reqId, Version.CURRENT.minimumCompatibilityVersion());
verify(requestSender).sendRequest(remoteNode, channel, reqId, Version.CURRENT.minimumCompatibilityVersion());
assertFalse(versionFuture.isDone());
@ -88,18 +98,39 @@ public class TransportHandshakerTests extends ESTestCase {
verify(responseSender).sendResponse(eq(Version.CURRENT), eq(Collections.emptySet()), eq(mockChannel), responseCaptor.capture(),
eq(reqId));
TransportResponseHandler<TransportHandshaker.HandshakeResponse> handler = handshaker.removeHandlerForHandshake(reqId);
handler.handleResponse((TransportHandshaker.HandshakeResponse) responseCaptor.getValue());
assertTrue(versionFuture.isDone());
assertEquals(Version.CURRENT, versionFuture.actionGet());
TransportHandshaker.HandshakeResponse response = (TransportHandshaker.HandshakeResponse) responseCaptor.getValue();
assertEquals(Version.CURRENT, response.getVersion());
assertEquals(clusterName, response.getClusterName());
assertEquals(localNode, response.getDiscoveryNode());
}
public void testHandshakeRequestAndResponsePreV7_6() throws IOException {
PlainActionFuture<Version> versionFuture = PlainActionFuture.newFuture();
long reqId = randomLongBetween(1, 10);
handshaker.sendHandshake(reqId, remoteNode, channel, new TimeValue(30, TimeUnit.SECONDS), versionFuture);
TransportResponseHandler<TransportHandshaker.HandshakeResponse> handler = handshaker.removeHandlerForHandshake(reqId);
try (BytesStreamOutput out = new BytesStreamOutput()) {
new TransportHandshaker.HandshakeResponse(Version.V_7_5_0, Version.V_7_5_0, clusterName, localNode).writeTo(out);
TransportHandshaker.HandshakeResponse response = handler.read(out.bytes().streamInput());
assertEquals(Version.V_7_5_0, response.getVersion());
// When writing or reading a 6.6 stream, these are not serialized
assertNull(response.getDiscoveryNode());
assertNull(response.getClusterName());
}
}
public void testHandshakeRequestFutureVersionsCompatibility() throws IOException {
long reqId = randomLongBetween(1, 10);
handshaker.sendHandshake(reqId, node, channel, new TimeValue(30, TimeUnit.SECONDS), PlainActionFuture.newFuture());
handshaker.sendHandshake(reqId, remoteNode, channel, new TimeValue(30, TimeUnit.SECONDS), PlainActionFuture.newFuture());
verify(requestSender).sendRequest(node, channel, reqId, Version.CURRENT.minimumCompatibilityVersion());
verify(requestSender).sendRequest(remoteNode, channel, reqId, Version.CURRENT.minimumCompatibilityVersion());
TcpChannel mockChannel = mock(TcpChannel.class);
TransportHandshaker.HandshakeRequest handshakeRequest = new TransportHandshaker.HandshakeRequest(Version.CURRENT);
@ -131,15 +162,15 @@ public class TransportHandshakerTests extends ESTestCase {
TransportHandshaker.HandshakeResponse response = (TransportHandshaker.HandshakeResponse) responseCaptor.getValue();
assertEquals(Version.CURRENT, response.getResponseVersion());
assertEquals(Version.CURRENT, response.getVersion());
}
public void testHandshakeError() throws IOException {
PlainActionFuture<Version> versionFuture = PlainActionFuture.newFuture();
long reqId = randomLongBetween(1, 10);
handshaker.sendHandshake(reqId, node, channel, new TimeValue(30, TimeUnit.SECONDS), versionFuture);
handshaker.sendHandshake(reqId, remoteNode, channel, new TimeValue(30, TimeUnit.SECONDS), versionFuture);
verify(requestSender).sendRequest(node, channel, reqId, Version.CURRENT.minimumCompatibilityVersion());
verify(requestSender).sendRequest(remoteNode, channel, reqId, Version.CURRENT.minimumCompatibilityVersion());
assertFalse(versionFuture.isDone());
@ -155,9 +186,9 @@ public class TransportHandshakerTests extends ESTestCase {
PlainActionFuture<Version> versionFuture = PlainActionFuture.newFuture();
long reqId = randomLongBetween(1, 10);
Version compatibilityVersion = Version.CURRENT.minimumCompatibilityVersion();
doThrow(new IOException("boom")).when(requestSender).sendRequest(node, channel, reqId, compatibilityVersion);
doThrow(new IOException("boom")).when(requestSender).sendRequest(remoteNode, channel, reqId, compatibilityVersion);
handshaker.sendHandshake(reqId, node, channel, new TimeValue(30, TimeUnit.SECONDS), versionFuture);
handshaker.sendHandshake(reqId, remoteNode, channel, new TimeValue(30, TimeUnit.SECONDS), versionFuture);
assertTrue(versionFuture.isDone());
ConnectTransportException cte = expectThrows(ConnectTransportException.class, versionFuture::actionGet);
@ -168,9 +199,9 @@ public class TransportHandshakerTests extends ESTestCase {
public void testHandshakeTimeout() throws IOException {
PlainActionFuture<Version> versionFuture = PlainActionFuture.newFuture();
long reqId = randomLongBetween(1, 10);
handshaker.sendHandshake(reqId, node, channel, new TimeValue(100, TimeUnit.MILLISECONDS), versionFuture);
handshaker.sendHandshake(reqId, remoteNode, channel, new TimeValue(100, TimeUnit.MILLISECONDS), versionFuture);
verify(requestSender).sendRequest(node, channel, reqId, Version.CURRENT.minimumCompatibilityVersion());
verify(requestSender).sendRequest(remoteNode, channel, reqId, Version.CURRENT.minimumCompatibilityVersion());
ConnectTransportException cte = expectThrows(ConnectTransportException.class, versionFuture::actionGet);
assertThat(cte.getMessage(), containsString("handshake_timeout"));

View File

@ -257,6 +257,10 @@ public class MockTransport implements Transport, LifecycleComponent {
return requestHandlers.get(action);
}
@Override
public void setLocalNode(DiscoveryNode localNode) {
}
@Override
public void setMessageListener(TransportMessageListener listener) {
if (this.listener != null) {

View File

@ -101,6 +101,11 @@ public final class StubbableTransport implements Transport {
delegate.setMessageListener(listener);
}
@Override
public void setLocalNode(DiscoveryNode localNode) {
delegate.setLocalNode(localNode);
}
@Override
public <Request extends TransportRequest> void registerRequestHandler(RequestHandlerRegistry<Request> reg) {
delegate.registerRequestHandler(reg);