CCS: don't proxy requests for already connected node (#31273)
Cross-cluster search selects a subset of nodes for each remote cluster and sends requests only to them, which will act as a proxy and properly redirect such requests to the target nodes that hold the relevant data. What happens today is that every time we send a request to a remote cluster, it will be sent to the next node in the proxy list (in round-robin fashion), regardless of whether the target node is already amongst the ones that we are connected to. In case for instance we need to send a shard search request to a data node that's also one of the selected proxy nodes, we may end up sending the request to it through one of the other proxy nodes. This commit optimizes this case to make sure that whenever we are already connected to a remote node, we will send a direct request rather than using the next proxy node. There is a side-effect to this, which is that round-robin will be a bit unbalanced as the data nodes that are also selected as proxies will receive more requests.
This commit is contained in:
parent
018d3fc81f
commit
664903a70a
|
@ -50,7 +50,6 @@ import org.elasticsearch.transport.RemoteClusterService;
|
|||
import org.elasticsearch.transport.Transport;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
|
@ -351,8 +350,7 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
|
|||
static GroupShardsIterator<SearchShardIterator> mergeShardsIterators(GroupShardsIterator<ShardIterator> localShardsIterator,
|
||||
OriginalIndices localIndices,
|
||||
List<SearchShardIterator> remoteShardIterators) {
|
||||
List<SearchShardIterator> shards = new ArrayList<>();
|
||||
shards.addAll(remoteShardIterators);
|
||||
List<SearchShardIterator> shards = new ArrayList<>(remoteShardIterators);
|
||||
for (ShardIterator shardIterator : localShardsIterator) {
|
||||
shards.add(new SearchShardIterator(null, shardIterator.shardId(), shardIterator.getShardRoutings(), localIndices));
|
||||
}
|
||||
|
@ -384,7 +382,7 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
|
|||
clusterStateVersion, aliasFilter, concreteIndexBoosts, indexRoutings, listener, false, clusters);
|
||||
return new SearchPhase(action.getName()) {
|
||||
@Override
|
||||
public void run() throws IOException {
|
||||
public void run() {
|
||||
action.start();
|
||||
}
|
||||
};
|
||||
|
|
|
@ -20,7 +20,6 @@ package org.elasticsearch.transport;
|
|||
|
||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||
import org.apache.lucene.store.AlreadyClosedException;
|
||||
import org.elasticsearch.core.internal.io.IOUtils;
|
||||
import org.apache.lucene.util.SetOnce;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
|
@ -40,6 +39,7 @@ import org.elasticsearch.common.unit.TimeValue;
|
|||
import org.elasticsearch.common.util.CancellableThreads;
|
||||
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
||||
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
||||
import org.elasticsearch.core.internal.io.IOUtils;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
import java.io.Closeable;
|
||||
|
@ -50,7 +50,6 @@ import java.util.Collections;
|
|||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ArrayBlockingQueue;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
|
@ -61,7 +60,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
|||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Predicate;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
|
@ -181,7 +179,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
|
|||
|
||||
private void fetchShardsInternal(ClusterSearchShardsRequest searchShardsRequest,
|
||||
final ActionListener<ClusterSearchShardsResponse> listener) {
|
||||
final DiscoveryNode node = connectedNodes.get();
|
||||
final DiscoveryNode node = connectedNodes.getAny();
|
||||
transportService.sendRequest(node, ClusterSearchShardsAction.NAME, searchShardsRequest,
|
||||
new TransportResponseHandler<ClusterSearchShardsResponse>() {
|
||||
|
||||
|
@ -217,7 +215,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
|
|||
request.clear();
|
||||
request.nodes(true);
|
||||
request.local(true); // run this on the node that gets the request it's as good as any other
|
||||
final DiscoveryNode node = connectedNodes.get();
|
||||
final DiscoveryNode node = connectedNodes.getAny();
|
||||
transportService.sendRequest(node, ClusterStateAction.NAME, request, TransportRequestOptions.EMPTY,
|
||||
new TransportResponseHandler<ClusterStateResponse>() {
|
||||
@Override
|
||||
|
@ -255,40 +253,52 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
|
|||
}
|
||||
|
||||
/**
|
||||
* Returns a connection to the remote cluster. This connection might be a proxy connection that redirects internally to the
|
||||
* given node.
|
||||
* Returns a connection to the remote cluster, preferably a direct connection to the provided {@link DiscoveryNode}.
|
||||
* If such node is not connected, the returned connection will be a proxy connection that redirects to it.
|
||||
*/
|
||||
Transport.Connection getConnection(DiscoveryNode remoteClusterNode) {
|
||||
DiscoveryNode discoveryNode = connectedNodes.get();
|
||||
if (transportService.nodeConnected(remoteClusterNode)) {
|
||||
return transportService.getConnection(remoteClusterNode);
|
||||
}
|
||||
DiscoveryNode discoveryNode = connectedNodes.getAny();
|
||||
Transport.Connection connection = transportService.getConnection(discoveryNode);
|
||||
return new Transport.Connection() {
|
||||
@Override
|
||||
public DiscoveryNode getNode() {
|
||||
return remoteClusterNode;
|
||||
}
|
||||
return new ProxyConnection(connection, remoteClusterNode);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options)
|
||||
static final class ProxyConnection implements Transport.Connection {
|
||||
private final Transport.Connection proxyConnection;
|
||||
private final DiscoveryNode targetNode;
|
||||
|
||||
private ProxyConnection(Transport.Connection proxyConnection, DiscoveryNode targetNode) {
|
||||
this.proxyConnection = proxyConnection;
|
||||
this.targetNode = targetNode;
|
||||
}
|
||||
|
||||
@Override
|
||||
public DiscoveryNode getNode() {
|
||||
return targetNode;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options)
|
||||
throws IOException, TransportException {
|
||||
connection.sendRequest(requestId, TransportActionProxy.getProxyAction(action),
|
||||
TransportActionProxy.wrapRequest(remoteClusterNode, request), options);
|
||||
}
|
||||
proxyConnection.sendRequest(requestId, TransportActionProxy.getProxyAction(action),
|
||||
TransportActionProxy.wrapRequest(targetNode, request), options);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
assert false: "proxy connections must not be closed";
|
||||
}
|
||||
@Override
|
||||
public void close() {
|
||||
assert false: "proxy connections must not be closed";
|
||||
}
|
||||
|
||||
@Override
|
||||
public Version getVersion() {
|
||||
return connection.getVersion();
|
||||
}
|
||||
};
|
||||
@Override
|
||||
public Version getVersion() {
|
||||
return proxyConnection.getVersion();
|
||||
}
|
||||
}
|
||||
|
||||
Transport.Connection getConnection() {
|
||||
DiscoveryNode discoveryNode = connectedNodes.get();
|
||||
return transportService.getConnection(discoveryNode);
|
||||
return transportService.getConnection(getAnyConnectedNode());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -385,7 +395,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void doRun() throws Exception {
|
||||
protected void doRun() {
|
||||
ActionListener<Void> listener = ActionListener.wrap((x) -> {
|
||||
synchronized (queue) {
|
||||
running.release();
|
||||
|
@ -590,8 +600,8 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
|
|||
return connectedNodes.contains(node);
|
||||
}
|
||||
|
||||
DiscoveryNode getConnectedNode() {
|
||||
return connectedNodes.get();
|
||||
DiscoveryNode getAnyConnectedNode() {
|
||||
return connectedNodes.getAny();
|
||||
}
|
||||
|
||||
void addConnectedNode(DiscoveryNode node) {
|
||||
|
@ -612,7 +622,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
|
|||
return connectedNodes.size();
|
||||
}
|
||||
|
||||
private static class ConnectedNodes implements Supplier<DiscoveryNode> {
|
||||
private static final class ConnectedNodes {
|
||||
|
||||
private final Set<DiscoveryNode> nodeSet = new HashSet<>();
|
||||
private final String clusterAlias;
|
||||
|
@ -623,8 +633,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
|
|||
this.clusterAlias = clusterAlias;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized DiscoveryNode get() {
|
||||
public synchronized DiscoveryNode getAny() {
|
||||
ensureIteratorAvailable();
|
||||
if (currentIterator.hasNext()) {
|
||||
return currentIterator.next();
|
||||
|
@ -657,15 +666,6 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
|
|||
return nodeSet.contains(node);
|
||||
}
|
||||
|
||||
synchronized Optional<DiscoveryNode> getAny() {
|
||||
ensureIteratorAvailable();
|
||||
if (currentIterator.hasNext()) {
|
||||
return Optional.of(currentIterator.next());
|
||||
} else {
|
||||
return Optional.empty();
|
||||
}
|
||||
}
|
||||
|
||||
private synchronized void ensureIteratorAvailable() {
|
||||
if (currentIterator == null) {
|
||||
currentIterator = nodeSet.iterator();
|
||||
|
|
|
@ -81,6 +81,7 @@ import static org.hamcrest.Matchers.instanceOf;
|
|||
import static org.hamcrest.Matchers.iterableWithSize;
|
||||
import static org.hamcrest.Matchers.not;
|
||||
import static org.hamcrest.Matchers.notNullValue;
|
||||
import static org.hamcrest.Matchers.sameInstance;
|
||||
import static org.hamcrest.Matchers.startsWith;
|
||||
|
||||
public class RemoteClusterConnectionTests extends ESTestCase {
|
||||
|
@ -992,7 +993,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
|
|||
barrier.await();
|
||||
for (int j = 0; j < numGetCalls; j++) {
|
||||
try {
|
||||
DiscoveryNode node = connection.getConnectedNode();
|
||||
DiscoveryNode node = connection.getAnyConnectedNode();
|
||||
assertNotNull(node);
|
||||
} catch (IllegalStateException e) {
|
||||
if (e.getMessage().startsWith("No node available for cluster:") == false) {
|
||||
|
@ -1053,10 +1054,10 @@ public class RemoteClusterConnectionTests extends ESTestCase {
|
|||
try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT, threadPool, settings);
|
||||
MockTransportService discoverableTransport = startTransport("discoverable_node", knownNodes, Version.CURRENT, threadPool,
|
||||
settings);
|
||||
MockTransportService otherClusterTransport = startTransport("other_cluster_discoverable_node", otherClusterKnownNodes,
|
||||
Version.CURRENT, threadPool, Settings.builder().put("cluster.name", "otherCluster").build());
|
||||
MockTransportService otherClusterDiscoverable= startTransport("other_cluster_discoverable_node", otherClusterKnownNodes,
|
||||
Version.CURRENT, threadPool, Settings.builder().put("cluster.name", "otherCluster").build())) {
|
||||
MockTransportService otherClusterTransport = startTransport("other_cluster_discoverable_node", otherClusterKnownNodes,
|
||||
Version.CURRENT, threadPool, Settings.builder().put("cluster.name", "otherCluster").build());
|
||||
MockTransportService otherClusterDiscoverable= startTransport("other_cluster_discoverable_node", otherClusterKnownNodes,
|
||||
Version.CURRENT, threadPool, Settings.builder().put("cluster.name", "otherCluster").build())) {
|
||||
DiscoveryNode seedNode = seedTransport.getLocalDiscoNode();
|
||||
DiscoveryNode discoverableNode = discoverableTransport.getLocalDiscoNode();
|
||||
knownNodes.add(seedTransport.getLocalDiscoNode());
|
||||
|
@ -1093,4 +1094,76 @@ public class RemoteClusterConnectionTests extends ESTestCase {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void testGetConnection() throws Exception {
|
||||
List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
|
||||
try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT);
|
||||
MockTransportService discoverableTransport = startTransport("discoverable_node", knownNodes, Version.CURRENT)) {
|
||||
|
||||
DiscoveryNode connectedNode = seedTransport.getLocalDiscoNode();
|
||||
assertThat(connectedNode, notNullValue());
|
||||
knownNodes.add(connectedNode);
|
||||
|
||||
DiscoveryNode disconnectedNode = discoverableTransport.getLocalDiscoNode();
|
||||
assertThat(disconnectedNode, notNullValue());
|
||||
knownNodes.add(disconnectedNode);
|
||||
|
||||
try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) {
|
||||
Transport.Connection seedConnection = new Transport.Connection() {
|
||||
@Override
|
||||
public DiscoveryNode getNode() {
|
||||
return connectedNode;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options)
|
||||
throws TransportException {
|
||||
// no-op
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
// no-op
|
||||
}
|
||||
};
|
||||
service.addDelegate(connectedNode.getAddress(), new MockTransportService.DelegateTransport(service.getOriginalTransport()) {
|
||||
@Override
|
||||
public Connection getConnection(DiscoveryNode node) {
|
||||
if (node == connectedNode) {
|
||||
return seedConnection;
|
||||
}
|
||||
return super.getConnection(node);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean nodeConnected(DiscoveryNode node) {
|
||||
return node.equals(connectedNode);
|
||||
}
|
||||
});
|
||||
service.start();
|
||||
service.acceptIncomingRequests();
|
||||
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
|
||||
Collections.singletonList(connectedNode), service, Integer.MAX_VALUE, n -> true)) {
|
||||
connection.addConnectedNode(connectedNode);
|
||||
for (int i = 0; i < 10; i++) {
|
||||
//always a direct connection as the remote node is already connected
|
||||
Transport.Connection remoteConnection = connection.getConnection(connectedNode);
|
||||
assertSame(seedConnection, remoteConnection);
|
||||
}
|
||||
for (int i = 0; i < 10; i++) {
|
||||
//always a direct connection as the remote node is already connected
|
||||
Transport.Connection remoteConnection = connection.getConnection(service.getLocalNode());
|
||||
assertThat(remoteConnection, not(instanceOf(RemoteClusterConnection.ProxyConnection.class)));
|
||||
assertThat(remoteConnection.getNode(), equalTo(service.getLocalNode()));
|
||||
}
|
||||
for (int i = 0; i < 10; i++) {
|
||||
//always a proxy connection as the target node is not connected
|
||||
Transport.Connection remoteConnection = connection.getConnection(disconnectedNode);
|
||||
assertThat(remoteConnection, instanceOf(RemoteClusterConnection.ProxyConnection.class));
|
||||
assertThat(remoteConnection.getNode(), sameInstance(disconnectedNode));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue