Transport client: Don't validate node in handshake (#30737)

This is related to #30141. Right now in the transport client we open a
temporary node connection and take the node information. This node
information is used to open a permanent connection that is used for the
client. However, we continue to use the configured transport address.
If the configured transport address is a load balancer, you might
connect to a different node for the permanent connection. This causes
the handshake validation to fail. This commit removes the handshake
validation for the transport client when it simple node sample mode.
This commit is contained in:
Tim Brooks 2018-05-31 13:14:28 -06:00 committed by GitHub
parent d9ab1469c9
commit 4f66b9a27c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 66 additions and 13 deletions

View File

@ -21,7 +21,7 @@ package org.elasticsearch.client.transport;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
@ -56,6 +56,7 @@ import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
@ -89,6 +90,7 @@ final class TransportClientNodesService extends AbstractComponent implements Clo
private final Object mutex = new Object();
private volatile List<DiscoveryNode> nodes = Collections.emptyList();
// Filtered nodes are nodes whose cluster name does not match the configured cluster name
private volatile List<DiscoveryNode> filteredNodes = Collections.emptyList();
private final AtomicInteger tempNodeIdGenerator = new AtomicInteger();
@ -268,7 +270,7 @@ final class TransportClientNodesService extends AbstractComponent implements Clo
private volatile int i;
RetryListener(NodeListenerCallback<Response> callback, ActionListener<Response> listener,
List<DiscoveryNode> nodes, int index, TransportClient.HostFailureListener hostFailureListener) {
List<DiscoveryNode> nodes, int index, TransportClient.HostFailureListener hostFailureListener) {
this.callback = callback;
this.listener = listener;
this.nodes = nodes;
@ -361,10 +363,10 @@ final class TransportClientNodesService extends AbstractComponent implements Clo
protected abstract void doSample();
/**
* validates a set of potentially newly discovered nodes and returns an immutable
* list of the nodes that has passed.
* Establishes the node connections. If validateInHandshake is set to true, the connection will fail if
* node returned in the handshake response is different than the discovery node.
*/
protected List<DiscoveryNode> validateNewNodes(Set<DiscoveryNode> nodes) {
List<DiscoveryNode> establishNodeConnections(Set<DiscoveryNode> nodes) {
for (Iterator<DiscoveryNode> it = nodes.iterator(); it.hasNext(); ) {
DiscoveryNode node = it.next();
if (!transportService.nodeConnected(node)) {
@ -380,7 +382,6 @@ final class TransportClientNodesService extends AbstractComponent implements Clo
return Collections.unmodifiableList(new ArrayList<>(nodes));
}
}
class ScheduledNodeSampler implements Runnable {
@ -402,14 +403,16 @@ final class TransportClientNodesService extends AbstractComponent implements Clo
@Override
protected void doSample() {
HashSet<DiscoveryNode> newNodes = new HashSet<>();
HashSet<DiscoveryNode> newFilteredNodes = new HashSet<>();
ArrayList<DiscoveryNode> newFilteredNodes = new ArrayList<>();
for (DiscoveryNode listedNode : listedNodes) {
try (Transport.Connection connection = transportService.openConnection(listedNode, LISTED_NODES_PROFILE)){
final PlainTransportFuture<LivenessResponse> handler = new PlainTransportFuture<>(
new FutureTransportResponseHandler<LivenessResponse>() {
@Override
public LivenessResponse newInstance() {
return new LivenessResponse();
public LivenessResponse read(StreamInput in) throws IOException {
LivenessResponse response = new LivenessResponse();
response.readFrom(in);
return response;
}
});
transportService.sendRequest(connection, TransportLivenessAction.NAME, new LivenessRequest(),
@ -435,8 +438,8 @@ final class TransportClientNodesService extends AbstractComponent implements Clo
}
}
nodes = validateNewNodes(newNodes);
filteredNodes = Collections.unmodifiableList(new ArrayList<>(newFilteredNodes));
nodes = establishNodeConnections(newNodes);
filteredNodes = Collections.unmodifiableList(newFilteredNodes);
}
}
@ -557,7 +560,7 @@ final class TransportClientNodesService extends AbstractComponent implements Clo
}
}
nodes = validateNewNodes(newNodes);
nodes = establishNodeConnections(newNodes);
filteredNodes = Collections.unmodifiableList(new ArrayList<>(newFilteredNodes));
}
}

View File

@ -21,6 +21,8 @@ package org.elasticsearch.transport;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.node.liveness.TransportLivenessAction;
@ -124,6 +126,8 @@ public class TransportService extends AbstractLifecycleComponent {
private final RemoteClusterService remoteClusterService;
private final boolean validateConnections;
/** if set will call requests sent to this id to shortcut and executed locally */
volatile DiscoveryNode localNode = null;
private final Transport.Connection localNodeConnection = new Transport.Connection() {
@ -153,6 +157,9 @@ public class TransportService extends AbstractLifecycleComponent {
Function<BoundTransportAddress, DiscoveryNode> localNodeFactory, @Nullable ClusterSettings clusterSettings,
Set<String> taskHeaders) {
super(settings);
// The only time we do not want to validate node connections is when this is a transport client using the simple node sampler
this.validateConnections = TransportClient.CLIENT_TYPE.equals(settings.get(Client.CLIENT_TYPE_SETTING_S.getKey())) == false ||
TransportClient.CLIENT_TRANSPORT_SNIFF.get(settings);
this.transport = transport;
this.threadPool = threadPool;
this.localNodeFactory = localNodeFactory;
@ -314,6 +321,11 @@ public class TransportService extends AbstractLifecycleComponent {
return isLocalNode(node) || transport.nodeConnected(node);
}
/**
* Connect to the specified node with the default connection profile
*
* @param node the node to connect to
*/
public void connectToNode(DiscoveryNode node) throws ConnectTransportException {
connectToNode(node, null);
}
@ -331,7 +343,7 @@ public class TransportService extends AbstractLifecycleComponent {
transport.connectToNode(node, connectionProfile, (newConnection, actualProfile) -> {
// We don't validate cluster names to allow for CCS connections.
final DiscoveryNode remote = handshake(newConnection, actualProfile.getHandshakeTimeout().millis(), cn -> true);
if (node.equals(remote) == false) {
if (validateConnections && node.equals(remote) == false) {
throw new ConnectTransportException(node, "handshake failed. unexpected remote node " + remote);
}
});

View File

@ -20,6 +20,8 @@
package org.elasticsearch.transport;
import org.elasticsearch.Version;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.network.NetworkService;
@ -178,6 +180,42 @@ public class TransportServiceHandshakeTests extends ESTestCase {
assertFalse(handleA.transportService.nodeConnected(discoveryNode));
}
public void testNodeConnectWithDifferentNodeIdSucceedsIfThisIsTransportClientOfSimpleNodeSampler() {
Settings.Builder settings = Settings.builder().put("cluster.name", "test");
Settings transportClientSettings = settings.put(Client.CLIENT_TYPE_SETTING_S.getKey(), TransportClient.CLIENT_TYPE).build();
NetworkHandle handleA = startServices("TS_A", transportClientSettings, Version.CURRENT);
NetworkHandle handleB = startServices("TS_B", settings.build(), Version.CURRENT);
DiscoveryNode discoveryNode = new DiscoveryNode(
randomAlphaOfLength(10),
handleB.discoveryNode.getAddress(),
emptyMap(),
emptySet(),
handleB.discoveryNode.getVersion());
handleA.transportService.connectToNode(discoveryNode, MockTcpTransport.LIGHT_PROFILE);
assertTrue(handleA.transportService.nodeConnected(discoveryNode));
}
public void testNodeConnectWithDifferentNodeIdFailsWhenSnifferTransportClient() {
Settings.Builder settings = Settings.builder().put("cluster.name", "test");
Settings transportClientSettings = settings.put(Client.CLIENT_TYPE_SETTING_S.getKey(), TransportClient.CLIENT_TYPE)
.put(TransportClient.CLIENT_TRANSPORT_SNIFF.getKey(), true)
.build();
NetworkHandle handleA = startServices("TS_A", transportClientSettings, Version.CURRENT);
NetworkHandle handleB = startServices("TS_B", settings.build(), Version.CURRENT);
DiscoveryNode discoveryNode = new DiscoveryNode(
randomAlphaOfLength(10),
handleB.discoveryNode.getAddress(),
emptyMap(),
emptySet(),
handleB.discoveryNode.getVersion());
ConnectTransportException ex = expectThrows(ConnectTransportException.class, () -> {
handleA.transportService.connectToNode(discoveryNode, MockTcpTransport.LIGHT_PROFILE);
});
assertThat(ex.getMessage(), containsString("unexpected remote node"));
assertFalse(handleA.transportService.nodeConnected(discoveryNode));
}
private static class NetworkHandle {
private TransportService transportService;