This reverts commit 0645ee88e2
.
This commit is contained in:
parent
4974f56b25
commit
d8510be3d9
|
@ -27,7 +27,6 @@ import org.elasticsearch.ElasticsearchException;
|
|||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.support.ThreadedActionListener;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.Booleans;
|
||||
import org.elasticsearch.common.Strings;
|
||||
|
@ -152,7 +151,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
|
|||
BigArrays bigArrays = new BigArrays(pageCacheRecycler, circuitBreakerService, CircuitBreaker.IN_FLIGHT_REQUESTS);
|
||||
|
||||
this.outboundHandler = new OutboundHandler(nodeName, version, features, threadPool, bigArrays);
|
||||
this.handshaker = new TransportHandshaker(ClusterName.CLUSTER_NAME_SETTING.get(settings), version, threadPool,
|
||||
this.handshaker = new TransportHandshaker(version, threadPool,
|
||||
(node, channel, requestId, v) -> outboundHandler.sendRequest(node, channel, requestId,
|
||||
TransportHandshaker.HANDSHAKE_ACTION_NAME, new TransportHandshaker.HandshakeRequest(version),
|
||||
TransportRequestOptions.EMPTY, v, false, true),
|
||||
|
@ -168,11 +167,6 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
|
|||
protected void doStart() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setLocalNode(DiscoveryNode localNode) {
|
||||
handshaker.setLocalNode(localNode);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void setMessageListener(TransportMessageListener listener) {
|
||||
outboundHandler.setMessageListener(listener);
|
||||
|
|
|
@ -52,8 +52,6 @@ public interface Transport extends LifecycleComponent {
|
|||
|
||||
void setMessageListener(TransportMessageListener listener);
|
||||
|
||||
void setLocalNode(DiscoveryNode localNode);
|
||||
|
||||
/**
|
||||
* The address the transport is bound on.
|
||||
*/
|
||||
|
|
|
@ -20,7 +20,6 @@ package org.elasticsearch.transport;
|
|||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||
|
@ -47,26 +46,19 @@ final class TransportHandshaker {
|
|||
private final ConcurrentMap<Long, HandshakeResponseHandler> pendingHandshakes = new ConcurrentHashMap<>();
|
||||
private final CounterMetric numHandshakes = new CounterMetric();
|
||||
|
||||
private final ClusterName clusterName;
|
||||
private final Version version;
|
||||
private final ThreadPool threadPool;
|
||||
private final HandshakeRequestSender handshakeRequestSender;
|
||||
private final HandshakeResponseSender handshakeResponseSender;
|
||||
private volatile DiscoveryNode localNode;
|
||||
|
||||
TransportHandshaker(ClusterName clusterName, Version version, ThreadPool threadPool, HandshakeRequestSender handshakeRequestSender,
|
||||
TransportHandshaker(Version version, ThreadPool threadPool, HandshakeRequestSender handshakeRequestSender,
|
||||
HandshakeResponseSender handshakeResponseSender) {
|
||||
this.clusterName = clusterName;
|
||||
this.version = version;
|
||||
this.threadPool = threadPool;
|
||||
this.handshakeRequestSender = handshakeRequestSender;
|
||||
this.handshakeResponseSender = handshakeResponseSender;
|
||||
}
|
||||
|
||||
void setLocalNode(DiscoveryNode localNode) {
|
||||
this.localNode = localNode;
|
||||
}
|
||||
|
||||
void sendHandshake(long requestId, DiscoveryNode node, TcpChannel channel, TimeValue timeout, ActionListener<Version> listener) {
|
||||
numHandshakes.inc();
|
||||
final HandshakeResponseHandler handler = new HandshakeResponseHandler(requestId, version, listener);
|
||||
|
@ -97,9 +89,6 @@ final class TransportHandshaker {
|
|||
}
|
||||
|
||||
void handleHandshake(Version version, Set<String> features, TcpChannel channel, long requestId, StreamInput stream) throws IOException {
|
||||
// The TransportService blocks incoming requests until this has been set.
|
||||
assert localNode != null : "Local node must be set before handshake is handled";
|
||||
|
||||
// Must read the handshake request to exhaust the stream
|
||||
HandshakeRequest handshakeRequest = new HandshakeRequest(stream);
|
||||
final int nextByte = stream.read();
|
||||
|
@ -107,7 +96,7 @@ final class TransportHandshaker {
|
|||
throw new IllegalStateException("Handshake request not fully read for requestId [" + requestId + "], action ["
|
||||
+ TransportHandshaker.HANDSHAKE_ACTION_NAME + "], available [" + stream.available() + "]; resetting");
|
||||
}
|
||||
HandshakeResponse response = new HandshakeResponse(handshakeRequest.version, this.version, this.clusterName, this.localNode);
|
||||
HandshakeResponse response = new HandshakeResponse(this.version);
|
||||
handshakeResponseSender.sendResponse(version, features, channel, response, requestId);
|
||||
}
|
||||
|
||||
|
@ -138,13 +127,13 @@ final class TransportHandshaker {
|
|||
|
||||
@Override
|
||||
public HandshakeResponse read(StreamInput in) throws IOException {
|
||||
return new HandshakeResponse(this.currentVersion, in);
|
||||
return new HandshakeResponse(in);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleResponse(HandshakeResponse response) {
|
||||
if (isDone.compareAndSet(false, true)) {
|
||||
Version version = response.version;
|
||||
Version version = response.responseVersion;
|
||||
if (currentVersion.isCompatible(version) == false) {
|
||||
listener.onFailure(new IllegalStateException("Received message from unsupported version: [" + version
|
||||
+ "] minimal compatible version is: [" + currentVersion.minimumCompatibilityVersion() + "]"));
|
||||
|
@ -212,58 +201,25 @@ final class TransportHandshaker {
|
|||
|
||||
static final class HandshakeResponse extends TransportResponse {
|
||||
|
||||
private final Version requestVersion;
|
||||
private final Version version;
|
||||
private final ClusterName clusterName;
|
||||
private final DiscoveryNode discoveryNode;
|
||||
private final Version responseVersion;
|
||||
|
||||
HandshakeResponse(Version requestVersion, Version responseVersion, ClusterName clusterName, DiscoveryNode discoveryNode) {
|
||||
this.requestVersion = requestVersion;
|
||||
this.version = responseVersion;
|
||||
this.clusterName = clusterName;
|
||||
this.discoveryNode = discoveryNode;
|
||||
HandshakeResponse(Version responseVersion) {
|
||||
this.responseVersion = responseVersion;
|
||||
}
|
||||
|
||||
private HandshakeResponse(Version requestVersion, StreamInput in) throws IOException {
|
||||
private HandshakeResponse(StreamInput in) throws IOException {
|
||||
super(in);
|
||||
this.requestVersion = requestVersion;
|
||||
version = Version.readVersion(in);
|
||||
// During the handshake process, nodes set their stream version to the minimum compatibility
|
||||
// version they support. When deserializing the response, we use the version the other node
|
||||
// told us that it actually is in the handshake response (`version`).
|
||||
if (requestVersion.onOrAfter(Version.V_7_6_0) && version.onOrAfter(Version.V_7_6_0)) {
|
||||
clusterName = new ClusterName(in);
|
||||
discoveryNode = new DiscoveryNode(in);
|
||||
} else {
|
||||
clusterName = null;
|
||||
discoveryNode = null;
|
||||
}
|
||||
responseVersion = Version.readVersion(in);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
assert version != null;
|
||||
Version.writeVersion(version, out);
|
||||
// During the handshake process, nodes set their stream version to the minimum compatibility
|
||||
// version they support. When deciding what response to send, we use the version the other node
|
||||
// told us that it actually is in the handshake request (`requestVersion`). If it did not tell
|
||||
// us a `requestVersion`, it is at least a pre-7.6 node.
|
||||
if (requestVersion != null && requestVersion.onOrAfter(Version.V_7_6_0) && version.onOrAfter(Version.V_7_6_0)) {
|
||||
clusterName.writeTo(out);
|
||||
discoveryNode.writeTo(out);
|
||||
}
|
||||
assert responseVersion != null;
|
||||
Version.writeVersion(responseVersion, out);
|
||||
}
|
||||
|
||||
Version getVersion() {
|
||||
return version;
|
||||
}
|
||||
|
||||
ClusterName getClusterName() {
|
||||
return clusterName;
|
||||
}
|
||||
|
||||
DiscoveryNode getDiscoveryNode() {
|
||||
return discoveryNode;
|
||||
Version getResponseVersion() {
|
||||
return responseVersion;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -236,7 +236,6 @@ public class TransportService extends AbstractLifecycleComponent implements Tran
|
|||
}
|
||||
}
|
||||
localNode = localNodeFactory.apply(transport.boundAddress());
|
||||
transport.setLocalNode(localNode);
|
||||
|
||||
if (connectToRemoteCluster) {
|
||||
// here we start to connect to the remote clusters
|
||||
|
|
|
@ -229,9 +229,4 @@ abstract class FailAndRetryMockTransport<Response extends TransportResponse> imp
|
|||
public void setMessageListener(TransportMessageListener listener) {
|
||||
this.listener = listener;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setLocalNode(DiscoveryNode localNode) {
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -415,10 +415,6 @@ public class NodeConnectionsServiceTests extends ESTestCase {
|
|||
public void setMessageListener(TransportMessageListener listener) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setLocalNode(DiscoveryNode localNode) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public BoundTransportAddress boundAddress() {
|
||||
return null;
|
||||
|
|
|
@ -21,7 +21,6 @@ package org.elasticsearch.transport;
|
|||
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||
|
@ -59,8 +58,9 @@ public class InboundHandlerTests extends ESTestCase {
|
|||
channel = new FakeTcpChannel(randomBoolean(), buildNewFakeTransportAddress().address(), buildNewFakeTransportAddress().address());
|
||||
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList());
|
||||
InboundMessage.Reader reader = new InboundMessage.Reader(version, namedWriteableRegistry, threadPool.getThreadContext());
|
||||
TransportHandshaker handshaker = new TransportHandshaker(new ClusterName("cluster-name"), version, threadPool, (n, c, r, v) -> {
|
||||
}, (v, f, c, r, r_id) -> {});
|
||||
TransportHandshaker handshaker = new TransportHandshaker(version, threadPool, (n, c, r, v) -> {
|
||||
}, (v, f, c, r, r_id) -> {
|
||||
});
|
||||
TransportKeepAlive keepAlive = new TransportKeepAlive(threadPool, TcpChannel::sendMessage);
|
||||
OutboundHandler outboundHandler =
|
||||
new OutboundHandler("node", version, new String[0], threadPool, BigArrays.NON_RECYCLING_INSTANCE);
|
||||
|
|
|
@ -20,12 +20,9 @@ package org.elasticsearch.transport;
|
|||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.support.PlainActionFuture;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.SuppressForbidden;
|
||||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.transport.TransportAddress;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.tasks.TaskId;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
@ -33,7 +30,6 @@ import org.elasticsearch.threadpool.TestThreadPool;
|
|||
import org.mockito.ArgumentCaptor;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetAddress;
|
||||
import java.util.Collections;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
|
@ -46,29 +42,23 @@ import static org.mockito.Mockito.verify;
|
|||
public class TransportHandshakerTests extends ESTestCase {
|
||||
|
||||
private TransportHandshaker handshaker;
|
||||
private DiscoveryNode remoteNode;
|
||||
private DiscoveryNode node;
|
||||
private TcpChannel channel;
|
||||
private TestThreadPool threadPool;
|
||||
private TransportHandshaker.HandshakeRequestSender requestSender;
|
||||
private TransportHandshaker.HandshakeResponseSender responseSender;
|
||||
private ClusterName clusterName;
|
||||
private DiscoveryNode localNode;
|
||||
|
||||
@Override
|
||||
@SuppressForbidden(reason = "Allow accessing localhost")
|
||||
public void setUp() throws Exception {
|
||||
super.setUp();
|
||||
String nodeId = "remote-node-id";
|
||||
String nodeId = "node-id";
|
||||
channel = mock(TcpChannel.class);
|
||||
requestSender = mock(TransportHandshaker.HandshakeRequestSender.class);
|
||||
responseSender = mock(TransportHandshaker.HandshakeResponseSender.class);
|
||||
remoteNode = new DiscoveryNode(nodeId, nodeId, nodeId, "host", "host_address", buildNewFakeTransportAddress(),
|
||||
Collections.emptyMap(), Collections.emptySet(), Version.CURRENT);
|
||||
node = new DiscoveryNode(nodeId, nodeId, nodeId, "host", "host_address", buildNewFakeTransportAddress(), Collections.emptyMap(),
|
||||
Collections.emptySet(), Version.CURRENT);
|
||||
threadPool = new TestThreadPool("thread-poll");
|
||||
clusterName = new ClusterName("cluster");
|
||||
localNode = new DiscoveryNode("local-node-id", new TransportAddress(InetAddress.getLocalHost(), 0), Version.CURRENT);
|
||||
handshaker = new TransportHandshaker(clusterName, Version.CURRENT, threadPool, requestSender, responseSender);
|
||||
handshaker.setLocalNode(localNode);
|
||||
handshaker = new TransportHandshaker(Version.CURRENT, threadPool, requestSender, responseSender);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -80,9 +70,9 @@ public class TransportHandshakerTests extends ESTestCase {
|
|||
public void testHandshakeRequestAndResponse() throws IOException {
|
||||
PlainActionFuture<Version> versionFuture = PlainActionFuture.newFuture();
|
||||
long reqId = randomLongBetween(1, 10);
|
||||
handshaker.sendHandshake(reqId, remoteNode, channel, new TimeValue(30, TimeUnit.SECONDS), versionFuture);
|
||||
handshaker.sendHandshake(reqId, node, channel, new TimeValue(30, TimeUnit.SECONDS), versionFuture);
|
||||
|
||||
verify(requestSender).sendRequest(remoteNode, channel, reqId, Version.CURRENT.minimumCompatibilityVersion());
|
||||
verify(requestSender).sendRequest(node, channel, reqId, Version.CURRENT.minimumCompatibilityVersion());
|
||||
|
||||
assertFalse(versionFuture.isDone());
|
||||
|
||||
|
@ -98,39 +88,18 @@ public class TransportHandshakerTests extends ESTestCase {
|
|||
verify(responseSender).sendResponse(eq(Version.CURRENT), eq(Collections.emptySet()), eq(mockChannel), responseCaptor.capture(),
|
||||
eq(reqId));
|
||||
|
||||
|
||||
TransportResponseHandler<TransportHandshaker.HandshakeResponse> handler = handshaker.removeHandlerForHandshake(reqId);
|
||||
handler.handleResponse((TransportHandshaker.HandshakeResponse) responseCaptor.getValue());
|
||||
|
||||
assertTrue(versionFuture.isDone());
|
||||
assertEquals(Version.CURRENT, versionFuture.actionGet());
|
||||
TransportHandshaker.HandshakeResponse response = (TransportHandshaker.HandshakeResponse) responseCaptor.getValue();
|
||||
assertEquals(Version.CURRENT, response.getVersion());
|
||||
assertEquals(clusterName, response.getClusterName());
|
||||
assertEquals(localNode, response.getDiscoveryNode());
|
||||
}
|
||||
|
||||
public void testHandshakeRequestAndResponsePreV7_6() throws IOException {
|
||||
PlainActionFuture<Version> versionFuture = PlainActionFuture.newFuture();
|
||||
long reqId = randomLongBetween(1, 10);
|
||||
handshaker.sendHandshake(reqId, remoteNode, channel, new TimeValue(30, TimeUnit.SECONDS), versionFuture);
|
||||
|
||||
TransportResponseHandler<TransportHandshaker.HandshakeResponse> handler = handshaker.removeHandlerForHandshake(reqId);
|
||||
try (BytesStreamOutput out = new BytesStreamOutput()) {
|
||||
new TransportHandshaker.HandshakeResponse(Version.V_7_5_0, Version.V_7_5_0, clusterName, localNode).writeTo(out);
|
||||
TransportHandshaker.HandshakeResponse response = handler.read(out.bytes().streamInput());
|
||||
assertEquals(Version.V_7_5_0, response.getVersion());
|
||||
// When writing or reading a 6.6 stream, these are not serialized
|
||||
assertNull(response.getDiscoveryNode());
|
||||
assertNull(response.getClusterName());
|
||||
}
|
||||
}
|
||||
|
||||
public void testHandshakeRequestFutureVersionsCompatibility() throws IOException {
|
||||
long reqId = randomLongBetween(1, 10);
|
||||
handshaker.sendHandshake(reqId, remoteNode, channel, new TimeValue(30, TimeUnit.SECONDS), PlainActionFuture.newFuture());
|
||||
handshaker.sendHandshake(reqId, node, channel, new TimeValue(30, TimeUnit.SECONDS), PlainActionFuture.newFuture());
|
||||
|
||||
verify(requestSender).sendRequest(remoteNode, channel, reqId, Version.CURRENT.minimumCompatibilityVersion());
|
||||
verify(requestSender).sendRequest(node, channel, reqId, Version.CURRENT.minimumCompatibilityVersion());
|
||||
|
||||
TcpChannel mockChannel = mock(TcpChannel.class);
|
||||
TransportHandshaker.HandshakeRequest handshakeRequest = new TransportHandshaker.HandshakeRequest(Version.CURRENT);
|
||||
|
@ -162,15 +131,15 @@ public class TransportHandshakerTests extends ESTestCase {
|
|||
|
||||
TransportHandshaker.HandshakeResponse response = (TransportHandshaker.HandshakeResponse) responseCaptor.getValue();
|
||||
|
||||
assertEquals(Version.CURRENT, response.getVersion());
|
||||
assertEquals(Version.CURRENT, response.getResponseVersion());
|
||||
}
|
||||
|
||||
public void testHandshakeError() throws IOException {
|
||||
PlainActionFuture<Version> versionFuture = PlainActionFuture.newFuture();
|
||||
long reqId = randomLongBetween(1, 10);
|
||||
handshaker.sendHandshake(reqId, remoteNode, channel, new TimeValue(30, TimeUnit.SECONDS), versionFuture);
|
||||
handshaker.sendHandshake(reqId, node, channel, new TimeValue(30, TimeUnit.SECONDS), versionFuture);
|
||||
|
||||
verify(requestSender).sendRequest(remoteNode, channel, reqId, Version.CURRENT.minimumCompatibilityVersion());
|
||||
verify(requestSender).sendRequest(node, channel, reqId, Version.CURRENT.minimumCompatibilityVersion());
|
||||
|
||||
assertFalse(versionFuture.isDone());
|
||||
|
||||
|
@ -186,9 +155,9 @@ public class TransportHandshakerTests extends ESTestCase {
|
|||
PlainActionFuture<Version> versionFuture = PlainActionFuture.newFuture();
|
||||
long reqId = randomLongBetween(1, 10);
|
||||
Version compatibilityVersion = Version.CURRENT.minimumCompatibilityVersion();
|
||||
doThrow(new IOException("boom")).when(requestSender).sendRequest(remoteNode, channel, reqId, compatibilityVersion);
|
||||
doThrow(new IOException("boom")).when(requestSender).sendRequest(node, channel, reqId, compatibilityVersion);
|
||||
|
||||
handshaker.sendHandshake(reqId, remoteNode, channel, new TimeValue(30, TimeUnit.SECONDS), versionFuture);
|
||||
handshaker.sendHandshake(reqId, node, channel, new TimeValue(30, TimeUnit.SECONDS), versionFuture);
|
||||
|
||||
assertTrue(versionFuture.isDone());
|
||||
ConnectTransportException cte = expectThrows(ConnectTransportException.class, versionFuture::actionGet);
|
||||
|
@ -199,9 +168,9 @@ public class TransportHandshakerTests extends ESTestCase {
|
|||
public void testHandshakeTimeout() throws IOException {
|
||||
PlainActionFuture<Version> versionFuture = PlainActionFuture.newFuture();
|
||||
long reqId = randomLongBetween(1, 10);
|
||||
handshaker.sendHandshake(reqId, remoteNode, channel, new TimeValue(100, TimeUnit.MILLISECONDS), versionFuture);
|
||||
handshaker.sendHandshake(reqId, node, channel, new TimeValue(100, TimeUnit.MILLISECONDS), versionFuture);
|
||||
|
||||
verify(requestSender).sendRequest(remoteNode, channel, reqId, Version.CURRENT.minimumCompatibilityVersion());
|
||||
verify(requestSender).sendRequest(node, channel, reqId, Version.CURRENT.minimumCompatibilityVersion());
|
||||
|
||||
ConnectTransportException cte = expectThrows(ConnectTransportException.class, versionFuture::actionGet);
|
||||
assertThat(cte.getMessage(), containsString("handshake_timeout"));
|
||||
|
|
|
@ -257,10 +257,6 @@ public class MockTransport implements Transport, LifecycleComponent {
|
|||
return requestHandlers.get(action);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setLocalNode(DiscoveryNode localNode) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setMessageListener(TransportMessageListener listener) {
|
||||
if (this.listener != null) {
|
||||
|
|
|
@ -101,11 +101,6 @@ public final class StubbableTransport implements Transport {
|
|||
delegate.setMessageListener(listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setLocalNode(DiscoveryNode localNode) {
|
||||
delegate.setLocalNode(localNode);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <Request extends TransportRequest> void registerRequestHandler(RequestHandlerRegistry<Request> reg) {
|
||||
delegate.registerRequestHandler(reg);
|
||||
|
|
Loading…
Reference in New Issue