Cross Cluster Search: do not use dedicated masters as gateways (#30926)

When we are connecting to a remote cluster we should never select
dedicated master nodes as gateway nodes, or we will end up loading them
with requests that should rather go to other type of nodes e.g. data
nodes or coord_only nodes.

This commit adds the selection based on the node role, to the existing
selection based on version and potential node attributes.

Closes #30687
This commit is contained in:
Luca Cavanna 2018-05-30 12:32:41 +02:00 committed by GitHub
parent 6341d101d2
commit 3c21e46fa3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 199 additions and 11 deletions

View File

@ -18,8 +18,6 @@
*/
package org.elasticsearch.transport;
import org.elasticsearch.client.Client;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.OriginalIndices;
@ -27,6 +25,7 @@ import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsRequest;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.Strings;
@ -36,6 +35,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.Closeable;
@ -97,6 +97,9 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl
Setting.affixKeySetting("search.remote.", "skip_unavailable",
key -> boolSetting(key, false, Setting.Property.NodeScope, Setting.Property.Dynamic), REMOTE_CLUSTERS_SEEDS);
private static final Predicate<DiscoveryNode> DEFAULT_NODE_PREDICATE = (node) -> Version.CURRENT.isCompatible(node.getVersion())
&& (node.isMasterNode() == false || node.isDataNode() || node.isIngestNode());
private final TransportService transportService;
private final int numRemoteConnections;
private volatile Map<String, RemoteClusterConnection> remoteClusters = Collections.emptyMap();
@ -121,13 +124,6 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl
connectionListener.onResponse(null);
} else {
CountDown countDown = new CountDown(seeds.size());
Predicate<DiscoveryNode> nodePredicate = (node) -> Version.CURRENT.isCompatible(node.getVersion());
if (REMOTE_NODE_ATTRIBUTE.exists(settings)) {
// nodes can be tagged with node.attr.remote_gateway: true to allow a node to be a gateway node for
// cross cluster search
String attribute = REMOTE_NODE_ATTRIBUTE.get(settings);
nodePredicate = nodePredicate.and((node) -> Booleans.parseBoolean(node.getAttributes().getOrDefault(attribute, "false")));
}
remoteClusters.putAll(this.remoteClusters);
for (Map.Entry<String, List<DiscoveryNode>> entry : seeds.entrySet()) {
RemoteClusterConnection remote = this.remoteClusters.get(entry.getKey());
@ -143,7 +139,7 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl
if (remote == null) { // this is a new cluster we have to add a new representation
remote = new RemoteClusterConnection(settings, entry.getKey(), entry.getValue(), transportService, numRemoteConnections,
nodePredicate);
getNodePredicate(settings));
remoteClusters.put(entry.getKey(), remote);
}
@ -168,6 +164,15 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl
this.remoteClusters = Collections.unmodifiableMap(remoteClusters);
}
static Predicate<DiscoveryNode> getNodePredicate(Settings settings) {
if (REMOTE_NODE_ATTRIBUTE.exists(settings)) {
// nodes can be tagged with node.attr.remote_gateway: true to allow a node to be a gateway node for cross cluster search
String attribute = REMOTE_NODE_ATTRIBUTE.get(settings);
return DEFAULT_NODE_PREDICATE.and((node) -> Booleans.parseBoolean(node.getAttributes().getOrDefault(attribute, "false")));
}
return DEFAULT_NODE_PREDICATE;
}
/**
* Returns <code>true</code> if at least one remote cluster is configured
*/

View File

@ -18,7 +18,6 @@
*/
package org.elasticsearch.transport;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.LatchedActionListener;
@ -30,7 +29,9 @@ import org.elasticsearch.common.settings.AbstractScopedSettings;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
@ -40,6 +41,7 @@ import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@ -50,6 +52,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Predicate;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.instanceOf;
@ -279,6 +282,75 @@ public class RemoteClusterServiceTests extends ESTestCase {
}
}
public void testRemoteNodeRoles() throws IOException, InterruptedException {
final Settings settings = Settings.EMPTY;
final List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
final Settings data = Settings.builder().put("node.master", false).build();
final Settings dedicatedMaster = Settings.builder().put("node.data", false).put("node.ingest", "false").build();
try (MockTransportService c1N1 =
startTransport("cluster_1_node_1", knownNodes, Version.CURRENT, dedicatedMaster);
MockTransportService c1N2 =
startTransport("cluster_1_node_2", knownNodes, Version.CURRENT, data);
MockTransportService c2N1 =
startTransport("cluster_2_node_1", knownNodes, Version.CURRENT, dedicatedMaster);
MockTransportService c2N2 =
startTransport("cluster_2_node_2", knownNodes, Version.CURRENT, data)) {
final DiscoveryNode c1N1Node = c1N1.getLocalDiscoNode();
final DiscoveryNode c1N2Node = c1N2.getLocalDiscoNode();
final DiscoveryNode c2N1Node = c2N1.getLocalDiscoNode();
final DiscoveryNode c2N2Node = c2N2.getLocalDiscoNode();
knownNodes.add(c1N1Node);
knownNodes.add(c1N2Node);
knownNodes.add(c2N1Node);
knownNodes.add(c2N2Node);
Collections.shuffle(knownNodes, random());
try (MockTransportService transportService = MockTransportService.createNewService(
settings,
Version.CURRENT,
threadPool,
null)) {
transportService.start();
transportService.acceptIncomingRequests();
final Settings.Builder builder = Settings.builder();
builder.putList("search.remote.cluster_1.seeds", c1N1Node.getAddress().toString());
builder.putList("search.remote.cluster_2.seeds", c2N1Node.getAddress().toString());
try (RemoteClusterService service = new RemoteClusterService(settings, transportService)) {
assertFalse(service.isCrossClusterSearchEnabled());
service.initializeRemoteClusters();
assertFalse(service.isCrossClusterSearchEnabled());
final InetSocketAddress c1N1Address = c1N1Node.getAddress().address();
final InetSocketAddress c1N2Address = c1N2Node.getAddress().address();
final InetSocketAddress c2N1Address = c2N1Node.getAddress().address();
final InetSocketAddress c2N2Address = c2N2Node.getAddress().address();
final CountDownLatch firstLatch = new CountDownLatch(1);
service.updateRemoteCluster(
"cluster_1",
Arrays.asList(c1N1Address, c1N2Address),
connectionListener(firstLatch));
firstLatch.await();
final CountDownLatch secondLatch = new CountDownLatch(1);
service.updateRemoteCluster(
"cluster_2",
Arrays.asList(c2N1Address, c2N2Address),
connectionListener(secondLatch));
secondLatch.await();
assertTrue(service.isCrossClusterSearchEnabled());
assertTrue(service.isRemoteClusterRegistered("cluster_1"));
assertFalse(service.isRemoteNodeConnected("cluster_1", c1N1Node));
assertTrue(service.isRemoteNodeConnected("cluster_1", c1N2Node));
assertTrue(service.isRemoteClusterRegistered("cluster_2"));
assertFalse(service.isRemoteNodeConnected("cluster_2", c2N1Node));
assertTrue(service.isRemoteNodeConnected("cluster_2", c2N2Node));
}
}
}
}
private ActionListener<Void> connectionListener(final CountDownLatch latch) {
return ActionListener.wrap(x -> latch.countDown(), x -> fail());
}
@ -630,4 +702,115 @@ public class RemoteClusterServiceTests extends ESTestCase {
}
}
}
public void testGetNodePredicateNodeRoles() {
TransportAddress address = new TransportAddress(TransportAddress.META_ADDRESS, 0);
Predicate<DiscoveryNode> nodePredicate = RemoteClusterService.getNodePredicate(Settings.EMPTY);
{
DiscoveryNode all = new DiscoveryNode("id", address, Collections.emptyMap(),
new HashSet<>(EnumSet.allOf(DiscoveryNode.Role.class)), Version.CURRENT);
assertTrue(nodePredicate.test(all));
}
{
DiscoveryNode dataMaster = new DiscoveryNode("id", address, Collections.emptyMap(),
new HashSet<>(EnumSet.of(DiscoveryNode.Role.DATA, DiscoveryNode.Role.MASTER)), Version.CURRENT);
assertTrue(nodePredicate.test(dataMaster));
}
{
DiscoveryNode dedicatedMaster = new DiscoveryNode("id", address, Collections.emptyMap(),
new HashSet<>(EnumSet.of(DiscoveryNode.Role.MASTER)), Version.CURRENT);
assertFalse(nodePredicate.test(dedicatedMaster));
}
{
DiscoveryNode dedicatedIngest = new DiscoveryNode("id", address, Collections.emptyMap(),
new HashSet<>(EnumSet.of(DiscoveryNode.Role.INGEST)), Version.CURRENT);
assertTrue(nodePredicate.test(dedicatedIngest));
}
{
DiscoveryNode masterIngest = new DiscoveryNode("id", address, Collections.emptyMap(),
new HashSet<>(EnumSet.of(DiscoveryNode.Role.INGEST, DiscoveryNode.Role.MASTER)), Version.CURRENT);
assertTrue(nodePredicate.test(masterIngest));
}
{
DiscoveryNode dedicatedData = new DiscoveryNode("id", address, Collections.emptyMap(),
new HashSet<>(EnumSet.of(DiscoveryNode.Role.DATA)), Version.CURRENT);
assertTrue(nodePredicate.test(dedicatedData));
}
{
DiscoveryNode ingestData = new DiscoveryNode("id", address, Collections.emptyMap(),
new HashSet<>(EnumSet.of(DiscoveryNode.Role.DATA, DiscoveryNode.Role.INGEST)), Version.CURRENT);
assertTrue(nodePredicate.test(ingestData));
}
{
DiscoveryNode coordOnly = new DiscoveryNode("id", address, Collections.emptyMap(),
new HashSet<>(EnumSet.noneOf(DiscoveryNode.Role.class)), Version.CURRENT);
assertTrue(nodePredicate.test(coordOnly));
}
}
public void testGetNodePredicateNodeVersion() {
TransportAddress address = new TransportAddress(TransportAddress.META_ADDRESS, 0);
Set<DiscoveryNode.Role> roles = new HashSet<>(EnumSet.allOf(DiscoveryNode.Role.class));
Predicate<DiscoveryNode> nodePredicate = RemoteClusterService.getNodePredicate(Settings.EMPTY);
Version version = VersionUtils.randomVersion(random());
DiscoveryNode node = new DiscoveryNode("id", address, Collections.emptyMap(), roles, version);
assertThat(nodePredicate.test(node), equalTo(Version.CURRENT.isCompatible(version)));
}
public void testGetNodePredicateNodeAttrs() {
TransportAddress address = new TransportAddress(TransportAddress.META_ADDRESS, 0);
Set<DiscoveryNode.Role> roles = new HashSet<>(EnumSet.allOf(DiscoveryNode.Role.class));
Settings settings = Settings.builder().put("search.remote.node.attr", "gateway").build();
Predicate<DiscoveryNode> nodePredicate = RemoteClusterService.getNodePredicate(settings);
{
DiscoveryNode nonGatewayNode = new DiscoveryNode("id", address, Collections.singletonMap("gateway", "false"),
roles, Version.CURRENT);
assertFalse(nodePredicate.test(nonGatewayNode));
assertTrue(RemoteClusterService.getNodePredicate(Settings.EMPTY).test(nonGatewayNode));
}
{
DiscoveryNode gatewayNode = new DiscoveryNode("id", address, Collections.singletonMap("gateway", "true"),
roles, Version.CURRENT);
assertTrue(nodePredicate.test(gatewayNode));
assertTrue(RemoteClusterService.getNodePredicate(Settings.EMPTY).test(gatewayNode));
}
{
DiscoveryNode noAttrNode = new DiscoveryNode("id", address, Collections.emptyMap(), roles, Version.CURRENT);
assertFalse(nodePredicate.test(noAttrNode));
assertTrue(RemoteClusterService.getNodePredicate(Settings.EMPTY).test(noAttrNode));
}
}
public void testGetNodePredicatesCombination() {
TransportAddress address = new TransportAddress(TransportAddress.META_ADDRESS, 0);
Settings settings = Settings.builder().put("search.remote.node.attr", "gateway").build();
Predicate<DiscoveryNode> nodePredicate = RemoteClusterService.getNodePredicate(settings);
Set<DiscoveryNode.Role> allRoles = new HashSet<>(EnumSet.allOf(DiscoveryNode.Role.class));
Set<DiscoveryNode.Role> dedicatedMasterRoles = new HashSet<>(EnumSet.of(DiscoveryNode.Role.MASTER));
{
DiscoveryNode node = new DiscoveryNode("id", address, Collections.singletonMap("gateway", "true"),
dedicatedMasterRoles, Version.CURRENT);
assertFalse(nodePredicate.test(node));
}
{
DiscoveryNode node = new DiscoveryNode("id", address, Collections.singletonMap("gateway", "false"),
dedicatedMasterRoles, Version.CURRENT);
assertFalse(nodePredicate.test(node));
}
{
DiscoveryNode node = new DiscoveryNode("id", address, Collections.singletonMap("gateway", "false"),
dedicatedMasterRoles, Version.CURRENT);
assertFalse(nodePredicate.test(node));
}
{
DiscoveryNode node = new DiscoveryNode("id", address, Collections.singletonMap("gateway", "true"),
allRoles, Version.CURRENT);
assertTrue(nodePredicate.test(node));
}
{
DiscoveryNode node = new DiscoveryNode("id", address, Collections.singletonMap("gateway", "true"),
allRoles, Version.V_5_3_0);
assertFalse(nodePredicate.test(node));
}
}
}