Remove handshake from transport client

This commit removes handshaking from the transport client. This
handshaking is not needed because of the existence of the liveness
check.

Relates #18174
This commit is contained in:
Jason Tedor 2016-05-06 09:17:18 -04:00
parent e839dad978
commit e90d00ffce
6 changed files with 32 additions and 20 deletions

View File

@ -360,7 +360,7 @@ public class TransportClientNodesService extends AbstractComponent {
try {
// its a listed node, light connect to it...
logger.trace("connecting to listed node (light) [{}]", listedNode);
transportService.connectToNodeLight(listedNode, pingTimeout, !ignoreClusterName);
transportService.connectToNodeLight(listedNode);
} catch (Throwable e) {
logger.debug("failed to connect to node [{}], removed from nodes list", e, listedNode);
newFilteredNodes.add(listedNode);
@ -435,7 +435,7 @@ public class TransportClientNodesService extends AbstractComponent {
} else {
// its a listed node, light connect to it...
logger.trace("connecting to listed node (light) [{}]", listedNode);
transportService.connectToNodeLight(listedNode, pingTimeout, !ignoreClusterName);
transportService.connectToNodeLight(listedNode);
}
} catch (Exception e) {
logger.debug("failed to connect to node [{}], ignoring...", e, listedNode);

View File

@ -402,7 +402,7 @@ public class UnicastZenPing extends AbstractLifecycleComponent<ZenPing> implemen
// connect to the node, see if we manage to do it, if not, bail
if (!nodeFoundByAddress) {
logger.trace("[{}] connecting (light) to {}", sendPingsHandler.id(), finalNodeToSend);
transportService.connectToNodeLight(finalNodeToSend, timeout.getMillis());
transportService.connectToNodeLightAndHandshake(finalNodeToSend, timeout.getMillis());
} else {
logger.trace("[{}] connecting to {}", sendPingsHandler.id(), finalNodeToSend);
transportService.connectToNode(finalNodeToSend);

View File

@ -277,6 +277,18 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
transport.connectToNode(node);
}
/**
* Lightly connect to the specified node
*
* @param node the node to connect to
*/
public void connectToNodeLight(final DiscoveryNode node) {
if (node.equals(localNode)) {
return;
}
transport.connectToNodeLight(node);
}
/**
* Lightly connect to the specified node, and handshake cluster
* name and version
@ -287,8 +299,10 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
* @throws ConnectTransportException if the connection or the
* handshake failed
*/
public DiscoveryNode connectToNodeLight(final DiscoveryNode node, final long handshakeTimeout) throws ConnectTransportException {
return connectToNodeLight(node, handshakeTimeout, true);
public DiscoveryNode connectToNodeLightAndHandshake(
final DiscoveryNode node,
final long handshakeTimeout) throws ConnectTransportException {
return connectToNodeLightAndHandshake(node, handshakeTimeout, true);
}
/**
@ -305,7 +319,10 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
* @throws ConnectTransportException if the connection or the
* handshake failed
*/
public DiscoveryNode connectToNodeLight(final DiscoveryNode node, final long handshakeTimeout, final boolean checkClusterName) {
public DiscoveryNode connectToNodeLightAndHandshake(
final DiscoveryNode node,
final long handshakeTimeout,
final boolean checkClusterName) {
if (node.equals(localNode)) {
return localNode;
}
@ -353,7 +370,7 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
localNode != null ? localNode.getVersion().minimumCompatibilityVersion() : Version.CURRENT.minimumCompatibilityVersion());
}
public static class HandshakeRequest extends TransportRequest {
static class HandshakeRequest extends TransportRequest {
public static final HandshakeRequest INSTANCE = new HandshakeRequest();
@ -362,7 +379,7 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
}
public static class HandshakeResponse extends TransportResponse {
static class HandshakeResponse extends TransportResponse {
private DiscoveryNode discoveryNode;
private ClusterName clusterName;
private Version version;

View File

@ -74,11 +74,7 @@ abstract class FailAndRetryMockTransport<Response extends TransportResponse> imp
//we make sure that nodes get added to the connected ones when calling addTransportAddress, by returning proper nodes info
if (connectMode) {
TransportResponseHandler transportResponseHandler = transportServiceAdapter.onResponseReceived(requestId);
if (action.equals(TransportLivenessAction.NAME)) {
transportResponseHandler.handleResponse(new LivenessResponse(clusterName, node));
} else {
transportResponseHandler.handleResponse(new TransportService.HandshakeResponse(node, clusterName, Version.CURRENT));
}
transportResponseHandler.handleResponse(new LivenessResponse(ClusterName.DEFAULT, node));
return;
}

View File

@ -22,7 +22,6 @@ package org.elasticsearch.transport;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.ClusterSettings;
@ -1179,7 +1178,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
}
try {
serviceB.connectToNodeLight(nodeA, 100);
serviceB.connectToNodeLightAndHandshake(nodeA, 100);
fail("exception should be thrown");
} catch (ConnectTransportException e) {
// all is well
@ -1239,7 +1238,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
}
try {
serviceB.connectToNodeLight(nodeA, 100);
serviceB.connectToNodeLightAndHandshake(nodeA, 100);
fail("exception should be thrown");
} catch (ConnectTransportException e) {
// all is well

View File

@ -109,7 +109,7 @@ public class NettyTransportServiceHandshakeTests extends ESTestCase {
test);
DiscoveryNode connectedNode =
handleA.transportService.connectToNodeLight(
handleA.transportService.connectToNodeLightAndHandshake(
new DiscoveryNode(
"",
handleB.discoveryNode.getAddress(),
@ -131,7 +131,7 @@ public class NettyTransportServiceHandshakeTests extends ESTestCase {
NetworkHandle handleB = startServices("TS_B", settings, Version.CURRENT, new ClusterName("b"));
try {
handleA.transportService.connectToNodeLight(
handleA.transportService.connectToNodeLightAndHandshake(
new DiscoveryNode(
"",
handleB.discoveryNode.getAddress(),
@ -154,7 +154,7 @@ public class NettyTransportServiceHandshakeTests extends ESTestCase {
startServices("TS_B", settings, VersionUtils.getPreviousVersion(Version.CURRENT.minimumCompatibilityVersion()), test);
try {
handleA.transportService.connectToNodeLight(
handleA.transportService.connectToNodeLightAndHandshake(
new DiscoveryNode(
"",
handleB.discoveryNode.getAddress(),
@ -180,7 +180,7 @@ public class NettyTransportServiceHandshakeTests extends ESTestCase {
new ClusterName("b")
);
DiscoveryNode connectedNode = handleA.transportService.connectToNodeLight(
DiscoveryNode connectedNode = handleA.transportService.connectToNodeLightAndHandshake(
new DiscoveryNode(
"",
handleB.discoveryNode.getAddress(),