diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java index fe001e5ad19..7673a02b2d0 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java @@ -18,8 +18,6 @@ */ package org.elasticsearch.transport; -import org.elasticsearch.client.Client; -import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.OriginalIndices; @@ -27,6 +25,7 @@ import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsRequest; import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.client.Client; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.Booleans; import org.elasticsearch.common.Strings; @@ -36,6 +35,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.CountDown; +import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.threadpool.ThreadPool; import java.io.Closeable; @@ -97,6 +97,9 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl Setting.affixKeySetting("search.remote.", "skip_unavailable", key -> boolSetting(key, false, Setting.Property.NodeScope, Setting.Property.Dynamic), REMOTE_CLUSTERS_SEEDS); + private static final Predicate DEFAULT_NODE_PREDICATE = (node) -> Version.CURRENT.isCompatible(node.getVersion()) + && (node.isMasterNode() == false || node.isDataNode() || node.isIngestNode()); + private final TransportService transportService; private final int numRemoteConnections; private volatile Map remoteClusters = Collections.emptyMap(); @@ -121,13 +124,6 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl connectionListener.onResponse(null); } else { CountDown countDown = new CountDown(seeds.size()); - Predicate nodePredicate = (node) -> Version.CURRENT.isCompatible(node.getVersion()); - if (REMOTE_NODE_ATTRIBUTE.exists(settings)) { - // nodes can be tagged with node.attr.remote_gateway: true to allow a node to be a gateway node for - // cross cluster search - String attribute = REMOTE_NODE_ATTRIBUTE.get(settings); - nodePredicate = nodePredicate.and((node) -> Booleans.parseBoolean(node.getAttributes().getOrDefault(attribute, "false"))); - } remoteClusters.putAll(this.remoteClusters); for (Map.Entry> entry : seeds.entrySet()) { RemoteClusterConnection remote = this.remoteClusters.get(entry.getKey()); @@ -143,7 +139,7 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl if (remote == null) { // this is a new cluster we have to add a new representation remote = new RemoteClusterConnection(settings, entry.getKey(), entry.getValue(), transportService, numRemoteConnections, - nodePredicate); + getNodePredicate(settings)); remoteClusters.put(entry.getKey(), remote); } @@ -168,6 +164,15 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl this.remoteClusters = Collections.unmodifiableMap(remoteClusters); } + static Predicate getNodePredicate(Settings settings) { + if (REMOTE_NODE_ATTRIBUTE.exists(settings)) { + // nodes can be tagged with node.attr.remote_gateway: true to allow a node to be a gateway node for cross cluster search + String attribute = REMOTE_NODE_ATTRIBUTE.get(settings); + return DEFAULT_NODE_PREDICATE.and((node) -> Booleans.parseBoolean(node.getAttributes().getOrDefault(attribute, "false"))); + } + return DEFAULT_NODE_PREDICATE; + } + /** * Returns true if at least one remote cluster is configured */ diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java index e221a2fb207..5529f98af33 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java @@ -18,7 +18,6 @@ */ package org.elasticsearch.transport; -import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.LatchedActionListener; @@ -30,7 +29,9 @@ import org.elasticsearch.common.settings.AbstractScopedSettings; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.VersionUtils; import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; @@ -40,6 +41,7 @@ import java.net.InetAddress; import java.net.InetSocketAddress; import java.util.Arrays; import java.util.Collections; +import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -50,6 +52,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiFunction; +import java.util.function.Predicate; import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.instanceOf; @@ -279,6 +282,75 @@ public class RemoteClusterServiceTests extends ESTestCase { } } + public void testRemoteNodeRoles() throws IOException, InterruptedException { + final Settings settings = Settings.EMPTY; + final List knownNodes = new CopyOnWriteArrayList<>(); + final Settings data = Settings.builder().put("node.master", false).build(); + final Settings dedicatedMaster = Settings.builder().put("node.data", false).put("node.ingest", "false").build(); + try (MockTransportService c1N1 = + startTransport("cluster_1_node_1", knownNodes, Version.CURRENT, dedicatedMaster); + MockTransportService c1N2 = + startTransport("cluster_1_node_2", knownNodes, Version.CURRENT, data); + MockTransportService c2N1 = + startTransport("cluster_2_node_1", knownNodes, Version.CURRENT, dedicatedMaster); + MockTransportService c2N2 = + startTransport("cluster_2_node_2", knownNodes, Version.CURRENT, data)) { + final DiscoveryNode c1N1Node = c1N1.getLocalDiscoNode(); + final DiscoveryNode c1N2Node = c1N2.getLocalDiscoNode(); + final DiscoveryNode c2N1Node = c2N1.getLocalDiscoNode(); + final DiscoveryNode c2N2Node = c2N2.getLocalDiscoNode(); + knownNodes.add(c1N1Node); + knownNodes.add(c1N2Node); + knownNodes.add(c2N1Node); + knownNodes.add(c2N2Node); + Collections.shuffle(knownNodes, random()); + + try (MockTransportService transportService = MockTransportService.createNewService( + settings, + Version.CURRENT, + threadPool, + null)) { + transportService.start(); + transportService.acceptIncomingRequests(); + final Settings.Builder builder = Settings.builder(); + builder.putList("search.remote.cluster_1.seeds", c1N1Node.getAddress().toString()); + builder.putList("search.remote.cluster_2.seeds", c2N1Node.getAddress().toString()); + try (RemoteClusterService service = new RemoteClusterService(settings, transportService)) { + assertFalse(service.isCrossClusterSearchEnabled()); + service.initializeRemoteClusters(); + assertFalse(service.isCrossClusterSearchEnabled()); + + final InetSocketAddress c1N1Address = c1N1Node.getAddress().address(); + final InetSocketAddress c1N2Address = c1N2Node.getAddress().address(); + final InetSocketAddress c2N1Address = c2N1Node.getAddress().address(); + final InetSocketAddress c2N2Address = c2N2Node.getAddress().address(); + + final CountDownLatch firstLatch = new CountDownLatch(1); + service.updateRemoteCluster( + "cluster_1", + Arrays.asList(c1N1Address, c1N2Address), + connectionListener(firstLatch)); + firstLatch.await(); + + final CountDownLatch secondLatch = new CountDownLatch(1); + service.updateRemoteCluster( + "cluster_2", + Arrays.asList(c2N1Address, c2N2Address), + connectionListener(secondLatch)); + secondLatch.await(); + + assertTrue(service.isCrossClusterSearchEnabled()); + assertTrue(service.isRemoteClusterRegistered("cluster_1")); + assertFalse(service.isRemoteNodeConnected("cluster_1", c1N1Node)); + assertTrue(service.isRemoteNodeConnected("cluster_1", c1N2Node)); + assertTrue(service.isRemoteClusterRegistered("cluster_2")); + assertFalse(service.isRemoteNodeConnected("cluster_2", c2N1Node)); + assertTrue(service.isRemoteNodeConnected("cluster_2", c2N2Node)); + } + } + } + } + private ActionListener connectionListener(final CountDownLatch latch) { return ActionListener.wrap(x -> latch.countDown(), x -> fail()); } @@ -630,4 +702,115 @@ public class RemoteClusterServiceTests extends ESTestCase { } } } + + public void testGetNodePredicateNodeRoles() { + TransportAddress address = new TransportAddress(TransportAddress.META_ADDRESS, 0); + Predicate nodePredicate = RemoteClusterService.getNodePredicate(Settings.EMPTY); + { + DiscoveryNode all = new DiscoveryNode("id", address, Collections.emptyMap(), + new HashSet<>(EnumSet.allOf(DiscoveryNode.Role.class)), Version.CURRENT); + assertTrue(nodePredicate.test(all)); + } + { + DiscoveryNode dataMaster = new DiscoveryNode("id", address, Collections.emptyMap(), + new HashSet<>(EnumSet.of(DiscoveryNode.Role.DATA, DiscoveryNode.Role.MASTER)), Version.CURRENT); + assertTrue(nodePredicate.test(dataMaster)); + } + { + DiscoveryNode dedicatedMaster = new DiscoveryNode("id", address, Collections.emptyMap(), + new HashSet<>(EnumSet.of(DiscoveryNode.Role.MASTER)), Version.CURRENT); + assertFalse(nodePredicate.test(dedicatedMaster)); + } + { + DiscoveryNode dedicatedIngest = new DiscoveryNode("id", address, Collections.emptyMap(), + new HashSet<>(EnumSet.of(DiscoveryNode.Role.INGEST)), Version.CURRENT); + assertTrue(nodePredicate.test(dedicatedIngest)); + } + { + DiscoveryNode masterIngest = new DiscoveryNode("id", address, Collections.emptyMap(), + new HashSet<>(EnumSet.of(DiscoveryNode.Role.INGEST, DiscoveryNode.Role.MASTER)), Version.CURRENT); + assertTrue(nodePredicate.test(masterIngest)); + } + { + DiscoveryNode dedicatedData = new DiscoveryNode("id", address, Collections.emptyMap(), + new HashSet<>(EnumSet.of(DiscoveryNode.Role.DATA)), Version.CURRENT); + assertTrue(nodePredicate.test(dedicatedData)); + } + { + DiscoveryNode ingestData = new DiscoveryNode("id", address, Collections.emptyMap(), + new HashSet<>(EnumSet.of(DiscoveryNode.Role.DATA, DiscoveryNode.Role.INGEST)), Version.CURRENT); + assertTrue(nodePredicate.test(ingestData)); + } + { + DiscoveryNode coordOnly = new DiscoveryNode("id", address, Collections.emptyMap(), + new HashSet<>(EnumSet.noneOf(DiscoveryNode.Role.class)), Version.CURRENT); + assertTrue(nodePredicate.test(coordOnly)); + } + } + + public void testGetNodePredicateNodeVersion() { + TransportAddress address = new TransportAddress(TransportAddress.META_ADDRESS, 0); + Set roles = new HashSet<>(EnumSet.allOf(DiscoveryNode.Role.class)); + Predicate nodePredicate = RemoteClusterService.getNodePredicate(Settings.EMPTY); + Version version = VersionUtils.randomVersion(random()); + DiscoveryNode node = new DiscoveryNode("id", address, Collections.emptyMap(), roles, version); + assertThat(nodePredicate.test(node), equalTo(Version.CURRENT.isCompatible(version))); + } + + public void testGetNodePredicateNodeAttrs() { + TransportAddress address = new TransportAddress(TransportAddress.META_ADDRESS, 0); + Set roles = new HashSet<>(EnumSet.allOf(DiscoveryNode.Role.class)); + Settings settings = Settings.builder().put("search.remote.node.attr", "gateway").build(); + Predicate nodePredicate = RemoteClusterService.getNodePredicate(settings); + { + DiscoveryNode nonGatewayNode = new DiscoveryNode("id", address, Collections.singletonMap("gateway", "false"), + roles, Version.CURRENT); + assertFalse(nodePredicate.test(nonGatewayNode)); + assertTrue(RemoteClusterService.getNodePredicate(Settings.EMPTY).test(nonGatewayNode)); + } + { + DiscoveryNode gatewayNode = new DiscoveryNode("id", address, Collections.singletonMap("gateway", "true"), + roles, Version.CURRENT); + assertTrue(nodePredicate.test(gatewayNode)); + assertTrue(RemoteClusterService.getNodePredicate(Settings.EMPTY).test(gatewayNode)); + } + { + DiscoveryNode noAttrNode = new DiscoveryNode("id", address, Collections.emptyMap(), roles, Version.CURRENT); + assertFalse(nodePredicate.test(noAttrNode)); + assertTrue(RemoteClusterService.getNodePredicate(Settings.EMPTY).test(noAttrNode)); + } + } + + public void testGetNodePredicatesCombination() { + TransportAddress address = new TransportAddress(TransportAddress.META_ADDRESS, 0); + Settings settings = Settings.builder().put("search.remote.node.attr", "gateway").build(); + Predicate nodePredicate = RemoteClusterService.getNodePredicate(settings); + Set allRoles = new HashSet<>(EnumSet.allOf(DiscoveryNode.Role.class)); + Set dedicatedMasterRoles = new HashSet<>(EnumSet.of(DiscoveryNode.Role.MASTER)); + { + DiscoveryNode node = new DiscoveryNode("id", address, Collections.singletonMap("gateway", "true"), + dedicatedMasterRoles, Version.CURRENT); + assertFalse(nodePredicate.test(node)); + } + { + DiscoveryNode node = new DiscoveryNode("id", address, Collections.singletonMap("gateway", "false"), + dedicatedMasterRoles, Version.CURRENT); + assertFalse(nodePredicate.test(node)); + } + { + DiscoveryNode node = new DiscoveryNode("id", address, Collections.singletonMap("gateway", "false"), + dedicatedMasterRoles, Version.CURRENT); + assertFalse(nodePredicate.test(node)); + } + { + DiscoveryNode node = new DiscoveryNode("id", address, Collections.singletonMap("gateway", "true"), + allRoles, Version.CURRENT); + assertTrue(nodePredicate.test(node)); + } + { + DiscoveryNode node = new DiscoveryNode("id", address, Collections.singletonMap("gateway", "true"), + allRoles, Version.V_5_3_0); + assertFalse(nodePredicate.test(node)); + } + } }