Ensure we don't use a remote profile if cluster name matches (#31331)
If we are running into a race condition between a node being configured to be a remote node for cross cluster search etc. and that node joining the cluster we might connect to that node with a remote profile. If that node now joins the cluster it connected to it as a CCS remote node we use the wrong profile and can't use bulk connections etc. anymore. This change uses the remote profile only if we connect to a node that has a different cluster name than the local cluster. This is not a perfect fix for this situation but is the safe option while potentially only loose a small optimization of using less connections per node which is small anyways since we only connect to a small set of nodes. Closes #29321
This commit is contained in:
parent
5b94afd309
commit
3d5f113ada
|
@ -87,6 +87,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
|
|||
private volatile boolean skipUnavailable;
|
||||
private final ConnectHandler connectHandler;
|
||||
private SetOnce<ClusterName> remoteClusterName = new SetOnce<>();
|
||||
private final ClusterName localClusterName;
|
||||
|
||||
/**
|
||||
* Creates a new {@link RemoteClusterConnection}
|
||||
|
@ -100,6 +101,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
|
|||
RemoteClusterConnection(Settings settings, String clusterAlias, List<DiscoveryNode> seedNodes,
|
||||
TransportService transportService, int maxNumRemoteConnections, Predicate<DiscoveryNode> nodePredicate) {
|
||||
super(settings);
|
||||
this.localClusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings);
|
||||
this.transportService = transportService;
|
||||
this.maxNumRemoteConnections = maxNumRemoteConnections;
|
||||
this.nodePredicate = nodePredicate;
|
||||
|
@ -310,6 +312,21 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
|
|||
return connectHandler.isClosed();
|
||||
}
|
||||
|
||||
private ConnectionProfile getRemoteProfile(ClusterName name) {
|
||||
// we can only compare the cluster name to make a decision if we should use a remote profile
|
||||
// we can't use a cluster UUID here since we could be connecting to that remote cluster before
|
||||
// the remote node has joined its cluster and have a cluster UUID. The fact that we just lose a
|
||||
// rather smallish optimization on the connection layer under certain situations where remote clusters
|
||||
// have the same name as the local one is minor here.
|
||||
// the alternative here is to complicate the remote infrastructure to also wait until we formed a cluster,
|
||||
// gained a cluster UUID and then start connecting etc. we rather use this simplification in order to maintain simplicity
|
||||
if (this.localClusterName.equals(name)) {
|
||||
return null;
|
||||
} else {
|
||||
return remoteProfile;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* The connect handler manages node discovery and the actual connect to the remote cluster.
|
||||
* There is at most one connect job running at any time. If such a connect job is triggered
|
||||
|
@ -419,7 +436,6 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
|
|||
collectRemoteNodes(seedNodes.iterator(), transportService, listener);
|
||||
}
|
||||
});
|
||||
|
||||
}
|
||||
|
||||
void collectRemoteNodes(Iterator<DiscoveryNode> seedNodes,
|
||||
|
@ -431,21 +447,27 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
|
|||
if (seedNodes.hasNext()) {
|
||||
cancellableThreads.executeIO(() -> {
|
||||
final DiscoveryNode seedNode = seedNodes.next();
|
||||
final DiscoveryNode handshakeNode;
|
||||
final TransportService.HandshakeResponse handshakeResponse;
|
||||
Transport.Connection connection = transportService.openConnection(seedNode,
|
||||
ConnectionProfile.buildSingleChannelProfile(TransportRequestOptions.Type.REG, null, null));
|
||||
boolean success = false;
|
||||
try {
|
||||
try {
|
||||
handshakeNode = transportService.handshake(connection, remoteProfile.getHandshakeTimeout().millis(),
|
||||
handshakeResponse = transportService.handshake(connection, remoteProfile.getHandshakeTimeout().millis(),
|
||||
(c) -> remoteClusterName.get() == null ? true : c.equals(remoteClusterName.get()));
|
||||
} catch (IllegalStateException ex) {
|
||||
logger.warn(() -> new ParameterizedMessage("seed node {} cluster name mismatch expected " +
|
||||
"cluster name {}", connection.getNode(), remoteClusterName.get()), ex);
|
||||
throw ex;
|
||||
}
|
||||
|
||||
final DiscoveryNode handshakeNode = handshakeResponse.getDiscoveryNode();
|
||||
if (nodePredicate.test(handshakeNode) && connectedNodes.size() < maxNumRemoteConnections) {
|
||||
transportService.connectToNode(handshakeNode, remoteProfile);
|
||||
transportService.connectToNode(handshakeNode, getRemoteProfile(handshakeResponse.getClusterName()));
|
||||
if (remoteClusterName.get() == null) {
|
||||
assert handshakeResponse.getClusterName().value() != null;
|
||||
remoteClusterName.set(handshakeResponse.getClusterName());
|
||||
}
|
||||
connectedNodes.add(handshakeNode);
|
||||
}
|
||||
ClusterStateRequest request = new ClusterStateRequest();
|
||||
|
@ -552,7 +574,8 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
|
|||
for (DiscoveryNode node : nodesIter) {
|
||||
if (nodePredicate.test(node) && connectedNodes.size() < maxNumRemoteConnections) {
|
||||
try {
|
||||
transportService.connectToNode(node, remoteProfile); // noop if node is connected
|
||||
transportService.connectToNode(node, getRemoteProfile(remoteClusterName.get())); // noop if node is
|
||||
// connected
|
||||
connectedNodes.add(node);
|
||||
} catch (ConnectTransportException | IllegalStateException ex) {
|
||||
// ISE if we fail the handshake with an version incompatible node
|
||||
|
|
|
@ -342,7 +342,7 @@ public class TransportService extends AbstractLifecycleComponent {
|
|||
}
|
||||
transport.connectToNode(node, connectionProfile, (newConnection, actualProfile) -> {
|
||||
// We don't validate cluster names to allow for CCS connections.
|
||||
final DiscoveryNode remote = handshake(newConnection, actualProfile.getHandshakeTimeout().millis(), cn -> true);
|
||||
final DiscoveryNode remote = handshake(newConnection, actualProfile.getHandshakeTimeout().millis(), cn -> true).discoveryNode;
|
||||
if (validateConnections && node.equals(remote) == false) {
|
||||
throw new ConnectTransportException(node, "handshake failed. unexpected remote node " + remote);
|
||||
}
|
||||
|
@ -378,7 +378,7 @@ public class TransportService extends AbstractLifecycleComponent {
|
|||
public DiscoveryNode handshake(
|
||||
final Transport.Connection connection,
|
||||
final long handshakeTimeout) throws ConnectTransportException {
|
||||
return handshake(connection, handshakeTimeout, clusterName::equals);
|
||||
return handshake(connection, handshakeTimeout, clusterName::equals).discoveryNode;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -390,11 +390,11 @@ public class TransportService extends AbstractLifecycleComponent {
|
|||
* @param connection the connection to a specific node
|
||||
* @param handshakeTimeout handshake timeout
|
||||
* @param clusterNamePredicate cluster name validation predicate
|
||||
* @return the connected node
|
||||
* @return the handshake response
|
||||
* @throws ConnectTransportException if the connection failed
|
||||
* @throws IllegalStateException if the handshake failed
|
||||
*/
|
||||
public DiscoveryNode handshake(
|
||||
public HandshakeResponse handshake(
|
||||
final Transport.Connection connection,
|
||||
final long handshakeTimeout, Predicate<ClusterName> clusterNamePredicate) throws ConnectTransportException {
|
||||
final HandshakeResponse response;
|
||||
|
@ -420,7 +420,7 @@ public class TransportService extends AbstractLifecycleComponent {
|
|||
throw new IllegalStateException("handshake failed, incompatible version [" + response.version + "] - " + node);
|
||||
}
|
||||
|
||||
return response.discoveryNode;
|
||||
return response;
|
||||
}
|
||||
|
||||
static class HandshakeRequest extends TransportRequest {
|
||||
|
@ -461,6 +461,14 @@ public class TransportService extends AbstractLifecycleComponent {
|
|||
clusterName.writeTo(out);
|
||||
Version.writeVersion(version, out);
|
||||
}
|
||||
|
||||
public DiscoveryNode getDiscoveryNode() {
|
||||
return discoveryNode;
|
||||
}
|
||||
|
||||
public ClusterName getClusterName() {
|
||||
return clusterName;
|
||||
}
|
||||
}
|
||||
|
||||
public void disconnectFromNode(DiscoveryNode node) {
|
||||
|
|
|
@ -142,6 +142,102 @@ public class RemoteClusterConnectionTests extends ESTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
public void testLocalProfileIsUsedForLocalCluster() throws Exception {
|
||||
List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
|
||||
try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT);
|
||||
MockTransportService discoverableTransport = startTransport("discoverable_node", knownNodes, Version.CURRENT)) {
|
||||
DiscoveryNode seedNode = seedTransport.getLocalDiscoNode();
|
||||
DiscoveryNode discoverableNode = discoverableTransport.getLocalDiscoNode();
|
||||
knownNodes.add(seedTransport.getLocalDiscoNode());
|
||||
knownNodes.add(discoverableTransport.getLocalDiscoNode());
|
||||
Collections.shuffle(knownNodes, random());
|
||||
|
||||
try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) {
|
||||
service.start();
|
||||
service.acceptIncomingRequests();
|
||||
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
|
||||
Arrays.asList(seedNode), service, Integer.MAX_VALUE, n -> true)) {
|
||||
updateSeedNodes(connection, Arrays.asList(seedNode));
|
||||
assertTrue(service.nodeConnected(seedNode));
|
||||
assertTrue(service.nodeConnected(discoverableNode));
|
||||
assertTrue(connection.assertNoRunningConnections());
|
||||
PlainTransportFuture<ClusterSearchShardsResponse> futureHandler = new PlainTransportFuture<>(
|
||||
new FutureTransportResponseHandler<ClusterSearchShardsResponse>() {
|
||||
@Override
|
||||
public ClusterSearchShardsResponse read(StreamInput in) throws IOException {
|
||||
ClusterSearchShardsResponse inst = new ClusterSearchShardsResponse();
|
||||
inst.readFrom(in);
|
||||
return inst;
|
||||
}
|
||||
});
|
||||
TransportRequestOptions options = TransportRequestOptions.builder().withType(TransportRequestOptions.Type.BULK)
|
||||
.build();
|
||||
service.sendRequest(connection.getConnection(), ClusterSearchShardsAction.NAME, new ClusterSearchShardsRequest(),
|
||||
options, futureHandler);
|
||||
futureHandler.txGet();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void testRemoteProfileIsUsedForRemoteCluster() throws Exception {
|
||||
List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
|
||||
try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT, threadPool,
|
||||
Settings.builder().put("cluster.name", "foobar").build());
|
||||
MockTransportService discoverableTransport = startTransport("discoverable_node", knownNodes, Version.CURRENT,
|
||||
threadPool, Settings.builder().put("cluster.name", "foobar").build())) {
|
||||
DiscoveryNode seedNode = seedTransport.getLocalDiscoNode();
|
||||
DiscoveryNode discoverableNode = discoverableTransport.getLocalDiscoNode();
|
||||
knownNodes.add(seedTransport.getLocalDiscoNode());
|
||||
knownNodes.add(discoverableTransport.getLocalDiscoNode());
|
||||
Collections.shuffle(knownNodes, random());
|
||||
|
||||
try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) {
|
||||
service.start();
|
||||
service.acceptIncomingRequests();
|
||||
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
|
||||
Arrays.asList(seedNode), service, Integer.MAX_VALUE, n -> true)) {
|
||||
updateSeedNodes(connection, Arrays.asList(seedNode));
|
||||
assertTrue(service.nodeConnected(seedNode));
|
||||
assertTrue(service.nodeConnected(discoverableNode));
|
||||
assertTrue(connection.assertNoRunningConnections());
|
||||
PlainTransportFuture<ClusterSearchShardsResponse> futureHandler = new PlainTransportFuture<>(
|
||||
new FutureTransportResponseHandler<ClusterSearchShardsResponse>() {
|
||||
@Override
|
||||
public ClusterSearchShardsResponse read(StreamInput in) throws IOException {
|
||||
ClusterSearchShardsResponse inst = new ClusterSearchShardsResponse();
|
||||
inst.readFrom(in);
|
||||
return inst;
|
||||
}
|
||||
});
|
||||
TransportRequestOptions options = TransportRequestOptions.builder().withType(TransportRequestOptions.Type.BULK)
|
||||
.build();
|
||||
IllegalStateException ise = (IllegalStateException) expectThrows(SendRequestTransportException.class, () -> {
|
||||
service.sendRequest(discoverableNode,
|
||||
ClusterSearchShardsAction.NAME, new ClusterSearchShardsRequest(), options, futureHandler);
|
||||
futureHandler.txGet();
|
||||
}).getCause();
|
||||
assertEquals(ise.getMessage(), "can't select channel size is 0 for types: [RECOVERY, BULK, STATE]");
|
||||
|
||||
PlainTransportFuture<ClusterSearchShardsResponse> handler = new PlainTransportFuture<>(
|
||||
new FutureTransportResponseHandler<ClusterSearchShardsResponse>() {
|
||||
@Override
|
||||
public ClusterSearchShardsResponse read(StreamInput in) throws IOException {
|
||||
ClusterSearchShardsResponse inst = new ClusterSearchShardsResponse();
|
||||
inst.readFrom(in);
|
||||
return inst;
|
||||
}
|
||||
});
|
||||
TransportRequestOptions ops = TransportRequestOptions.builder().withType(TransportRequestOptions.Type.REG)
|
||||
.build();
|
||||
service.sendRequest(connection.getConnection(), ClusterSearchShardsAction.NAME, new ClusterSearchShardsRequest(),
|
||||
ops, handler);
|
||||
handler.txGet();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void testDiscoverSingleNode() throws Exception {
|
||||
List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
|
||||
try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT);
|
||||
|
|
|
@ -191,7 +191,22 @@ public class MockTcpTransport extends TcpTransport {
|
|||
@Override
|
||||
protected ConnectionProfile resolveConnectionProfile(ConnectionProfile connectionProfile) {
|
||||
ConnectionProfile connectionProfile1 = resolveConnectionProfile(connectionProfile, defaultConnectionProfile);
|
||||
ConnectionProfile.Builder builder = new ConnectionProfile.Builder(LIGHT_PROFILE);
|
||||
ConnectionProfile.Builder builder = new ConnectionProfile.Builder();
|
||||
Set<TransportRequestOptions.Type> allTypesWithConnection = new HashSet<>();
|
||||
Set<TransportRequestOptions.Type> allTypesWithoutConnection = new HashSet<>();
|
||||
for (ConnectionProfile.ConnectionTypeHandle handle : connectionProfile1.getHandles()) {
|
||||
Set<TransportRequestOptions.Type> types = handle.getTypes();
|
||||
if (handle.length > 0) {
|
||||
allTypesWithConnection.addAll(types);
|
||||
} else {
|
||||
allTypesWithoutConnection.addAll(types);
|
||||
}
|
||||
}
|
||||
// make sure we maintain at least the types that are supported by this profile even if we only use a single channel for them.
|
||||
builder.addConnections(1, allTypesWithConnection.toArray(new TransportRequestOptions.Type[0]));
|
||||
if (allTypesWithoutConnection.isEmpty() == false) {
|
||||
builder.addConnections(0, allTypesWithoutConnection.toArray(new TransportRequestOptions.Type[0]));
|
||||
}
|
||||
builder.setHandshakeTimeout(connectionProfile1.getHandshakeTimeout());
|
||||
builder.setConnectTimeout(connectionProfile1.getConnectTimeout());
|
||||
return builder.build();
|
||||
|
|
Loading…
Reference in New Issue