From ef5181c6784ab8022563bd5c6cb5eb6ba064b07e Mon Sep 17 00:00:00 2001 From: Luca Cavanna Date: Wed, 31 Oct 2018 17:32:53 +0100 Subject: [PATCH] Allow to enable pings for specific remote clusters (#34753) When we connect to remote clusters, there may be a few more routers/firewalls in-between compared to when we connect to nodes in the same cluster. We've experienced cases where firewalls drop connections completely and keep-alives seem not to be enough, or they are not properly configured. With this commit we allow to enable application-level pings specifically from CCS nodes to the selected remote nodes through the new setting `cluster.remote.${clusterAlias}.transport.ping_schedule`. The new setting is similar `transport.ping_schedule` but it does not affect intra-cluster communication, pings are only sent to specific remote cluster when specifically enabled, as they are disabled by default. Relates to #34405 --- .../modules/remote-clusters.asciidoc | 9 ++ docs/reference/modules/transport.asciidoc | 6 +- .../common/settings/ClusterSettings.java | 1 + .../transport/ConnectionManager.java | 15 +- .../transport/RemoteClusterConnection.java | 13 +- .../transport/RemoteClusterService.java | 39 +++--- .../RemoteClusterConnectionTests.java | 50 +++---- .../transport/RemoteClusterServiceTests.java | 129 ++++++++++++++---- 8 files changed, 177 insertions(+), 85 deletions(-) diff --git a/docs/reference/modules/remote-clusters.asciidoc b/docs/reference/modules/remote-clusters.asciidoc index 81d882f5f0e..4bf3073abd3 100644 --- a/docs/reference/modules/remote-clusters.asciidoc +++ b/docs/reference/modules/remote-clusters.asciidoc @@ -152,6 +152,15 @@ PUT _cluster/settings by default, but they can selectively be made optional by setting this setting to `true`. +`cluster.remote.${cluster_alias}.transport.ping_schedule`:: + + Sets the time interval between regular application-level ping messages that + are sent to ensure that transport connections to nodes belonging to remote + clusters are kept alive. If set to `-1`, application-level ping messages to + this remote cluster are not sent. If unset, application-level ping messages + are sent according to the global `transport.ping_schedule` setting, which + defaults to ``-1` meaning that pings are not sent. + [float] [[retrieve-remote-clusters-info]] === Retrieving remote clusters info diff --git a/docs/reference/modules/transport.asciidoc b/docs/reference/modules/transport.asciidoc index 257181f70c5..c1bc83230e5 100644 --- a/docs/reference/modules/transport.asciidoc +++ b/docs/reference/modules/transport.asciidoc @@ -46,9 +46,9 @@ between all nodes. Defaults to `false`. |`transport.ping_schedule` | Schedule a regular application-level ping message to ensure that transport connections between nodes are kept alive. Defaults to -`5s` in the transport client and `-1` (disabled) elsewhere. It is preferable to -correctly configure TCP keep-alives instead of using this feature, because TCP -keep-alives apply to all kinds of long-lived connection and not just to +`5s` in the transport client and `-1` (disabled) elsewhere. It is preferable +to correctly configure TCP keep-alives instead of using this feature, because +TCP keep-alives apply to all kinds of long-lived connections and not just to transport connections. |======================================================================= diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index 66a4aa65c44..f72f31772aa 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -293,6 +293,7 @@ public final class ClusterSettings extends AbstractScopedSettings { RemoteClusterService.SEARCH_REMOTE_NODE_ATTRIBUTE, RemoteClusterService.ENABLE_REMOTE_CLUSTERS, RemoteClusterService.SEARCH_ENABLE_REMOTE_CLUSTERS, + RemoteClusterService.REMOTE_CLUSTER_PING_SCHEDULE, TransportService.TRACE_LOG_EXCLUDE_SETTING, TransportService.TRACE_LOG_INCLUDE_SETTING, TransportCloseIndexAction.CLUSTER_INDICES_CLOSE_ENABLE_SETTING, diff --git a/server/src/main/java/org/elasticsearch/transport/ConnectionManager.java b/server/src/main/java/org/elasticsearch/transport/ConnectionManager.java index 5f2635fac88..114bb8c986a 100644 --- a/server/src/main/java/org/elasticsearch/transport/ConnectionManager.java +++ b/server/src/main/java/org/elasticsearch/transport/ConnectionManager.java @@ -18,8 +18,8 @@ */ package org.elasticsearch.transport; -import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -67,16 +67,15 @@ public class ConnectionManager implements Closeable { private final DelegatingNodeConnectionListener connectionListener = new DelegatingNodeConnectionListener(); public ConnectionManager(Settings settings, Transport transport, ThreadPool threadPool) { - this(settings, transport, threadPool, ConnectionProfile.buildDefaultConnectionProfile(settings)); + this(settings, transport, threadPool, TcpTransport.PING_SCHEDULE.get(settings)); } - public ConnectionManager(Settings settings, Transport transport, ThreadPool threadPool, ConnectionProfile defaultProfile) { + public ConnectionManager(Settings settings, Transport transport, ThreadPool threadPool, TimeValue pingSchedule) { this.transport = transport; this.threadPool = threadPool; - this.pingSchedule = TcpTransport.PING_SCHEDULE.get(settings); - this.defaultProfile = defaultProfile; + this.pingSchedule = pingSchedule; + this.defaultProfile = ConnectionProfile.buildDefaultConnectionProfile(settings); this.lifecycle.moveToStarted(); - if (pingSchedule.millis() > 0) { threadPool.schedule(pingSchedule, ThreadPool.Names.GENERIC, new ScheduledPing()); } @@ -252,6 +251,10 @@ public class ConnectionManager implements Closeable { } } + TimeValue getPingSchedule() { + return pingSchedule; + } + private class ScheduledPing extends AbstractLifecycleRunnable { private ScheduledPing() { diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java index 0cfa2abd754..4dadc362b80 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java @@ -18,8 +18,6 @@ */ package org.elasticsearch.transport; -import java.net.InetSocketAddress; -import java.util.function.Supplier; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.util.SetOnce; @@ -48,6 +46,7 @@ import org.elasticsearch.threadpool.ThreadPool; import java.io.Closeable; import java.io.IOException; +import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -64,6 +63,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Predicate; +import java.util.function.Supplier; import java.util.stream.Collectors; /** @@ -105,13 +105,8 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo * @param connectionManager the connection manager to use for this remote connection * @param maxNumRemoteConnections the maximum number of connections to the remote cluster * @param nodePredicate a predicate to filter eligible remote nodes to connect to + * @param proxyAddress the proxy address */ - RemoteClusterConnection(Settings settings, String clusterAlias, List> seedNodes, - TransportService transportService, ConnectionManager connectionManager, int maxNumRemoteConnections, - Predicate nodePredicate) { - this(settings, clusterAlias, seedNodes, transportService, connectionManager, maxNumRemoteConnections, nodePredicate, null); - } - RemoteClusterConnection(Settings settings, String clusterAlias, List> seedNodes, TransportService transportService, ConnectionManager connectionManager, int maxNumRemoteConnections, Predicate nodePredicate, String proxyAddress) { @@ -151,7 +146,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo InetSocketAddress proxyInetAddress = RemoteClusterAware.parseSeedAddress(proxyAddress); return new DiscoveryNode(node.getName(), node.getId(), node.getEphemeralId(), node.getHostName(), node .getHostAddress(), new TransportAddress(proxyInetAddress), node.getAttributes(), node.getRoles(), node.getVersion()); - } + } } /** diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java index dc3bd3a3536..08f08207eae 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java @@ -60,6 +60,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import static org.elasticsearch.common.settings.Setting.boolSetting; +import static org.elasticsearch.common.settings.Setting.timeSetting; /** * Basic service for accessing remote clusters via gateway nodes @@ -166,6 +167,12 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl Setting.Property.NodeScope), REMOTE_CLUSTERS_SEEDS); + public static final Setting.AffixSetting REMOTE_CLUSTER_PING_SCHEDULE = Setting.affixKeySetting( + "cluster.remote.", + "transport.ping_schedule", + key -> timeSetting(key, TcpTransport.PING_SCHEDULE, Setting.Property.NodeScope), + REMOTE_CLUSTERS_SEEDS); + private static final Predicate DEFAULT_NODE_PREDICATE = (node) -> Version.CURRENT.isCompatible(node.getVersion()) && (node.isMasterNode() == false || node.isDataNode() || node.isIngestNode()); @@ -211,10 +218,13 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl } if (remote == null) { // this is a new cluster we have to add a new representation - remote = new RemoteClusterConnection(settings, entry.getKey(), seedList, transportService, - new ConnectionManager(settings, transportService.transport, transportService.threadPool), numRemoteConnections, - getNodePredicate(settings), proxyAddress); - remoteClusters.put(entry.getKey(), remote); + String clusterAlias = entry.getKey(); + TimeValue pingSchedule = REMOTE_CLUSTER_PING_SCHEDULE.getConcreteSettingForNamespace(clusterAlias).get(settings); + ConnectionManager connectionManager = new ConnectionManager(settings, transportService.transport, + transportService.threadPool, pingSchedule); + remote = new RemoteClusterConnection(settings, clusterAlias, seedList, transportService, connectionManager, + numRemoteConnections, getNodePredicate(settings), proxyAddress); + remoteClusters.put(clusterAlias, remote); } // now update the seed nodes no matter if it's new or already existing @@ -340,31 +350,27 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl * @throws IllegalArgumentException if the remote cluster is unknown */ public Transport.Connection getConnection(DiscoveryNode node, String cluster) { - RemoteClusterConnection connection = remoteClusters.get(cluster); - if (connection == null) { - throw new IllegalArgumentException("no such remote cluster: " + cluster); - } - return connection.getConnection(node); + return getRemoteClusterConnection(cluster).getConnection(node); } /** * Ensures that the given cluster alias is connected. If the cluster is connected this operation * will invoke the listener immediately. */ - public void ensureConnected(String clusterAlias, ActionListener listener) { - RemoteClusterConnection remoteClusterConnection = remoteClusters.get(clusterAlias); - if (remoteClusterConnection == null) { - throw new IllegalArgumentException("no such remote cluster: " + clusterAlias); - } - remoteClusterConnection.ensureConnected(listener); + void ensureConnected(String clusterAlias, ActionListener listener) { + getRemoteClusterConnection(clusterAlias).ensureConnected(listener); } public Transport.Connection getConnection(String cluster) { + return getRemoteClusterConnection(cluster).getConnection(); + } + + RemoteClusterConnection getRemoteClusterConnection(String cluster) { RemoteClusterConnection connection = remoteClusters.get(cluster); if (connection == null) { throw new IllegalArgumentException("no such remote cluster: " + cluster); } - return connection.getConnection(); + return connection; } @Override @@ -386,7 +392,6 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl } } - @Override protected void updateRemoteCluster(String clusterAlias, List addresses, String proxyAddress) { updateRemoteCluster(clusterAlias, addresses, proxyAddress, ActionListener.wrap((x) -> {}, (x) -> {})); diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java index 6c27680d741..e6f7b449b43 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java @@ -18,9 +18,6 @@ */ package org.elasticsearch.transport; -import java.util.HashMap; -import java.util.Map; -import java.util.function.Supplier; import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; @@ -68,7 +65,9 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Base64; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; @@ -79,6 +78,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; +import java.util.function.Supplier; import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; @@ -163,7 +163,7 @@ public class RemoteClusterConnectionTests extends ESTestCase { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true)) { + Arrays.asList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true, null)) { updateSeedNodes(connection, Arrays.asList(() -> seedNode)); assertTrue(service.nodeConnected(seedNode)); assertTrue(service.nodeConnected(discoverableNode)); @@ -204,7 +204,7 @@ public class RemoteClusterConnectionTests extends ESTestCase { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true)) { + Arrays.asList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true, null)) { updateSeedNodes(connection, Arrays.asList(() -> seedNode)); assertTrue(service.nodeConnected(seedNode)); assertTrue(service.nodeConnected(discoverableNode)); @@ -256,7 +256,7 @@ public class RemoteClusterConnectionTests extends ESTestCase { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true)) { + Arrays.asList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true, null)) { updateSeedNodes(connection, Arrays.asList(() -> seedNode)); assertTrue(service.nodeConnected(seedNode)); assertTrue(service.nodeConnected(discoverableNode)); @@ -285,7 +285,7 @@ public class RemoteClusterConnectionTests extends ESTestCase { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - seedNodes, service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true)) { + seedNodes, service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true, null)) { updateSeedNodes(connection, seedNodes); assertTrue(service.nodeConnected(seedNode)); assertTrue(service.nodeConnected(discoverableNode)); @@ -312,7 +312,7 @@ public class RemoteClusterConnectionTests extends ESTestCase { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true)) { + Arrays.asList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true, null)) { updateSeedNodes(connection, Arrays.asList(() -> seedNode)); assertTrue(service.nodeConnected(seedNode)); assertTrue(service.nodeConnected(discoverableNode)); @@ -362,7 +362,7 @@ public class RemoteClusterConnectionTests extends ESTestCase { service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", Arrays.asList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, - n -> n.equals(rejectedNode) == false)) { + n -> n.equals(rejectedNode) == false, null)) { updateSeedNodes(connection, Arrays.asList(() -> seedNode)); if (rejectedNode.equals(seedNode)) { assertFalse(service.nodeConnected(seedNode)); @@ -422,7 +422,7 @@ public class RemoteClusterConnectionTests extends ESTestCase { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true)) { + Arrays.asList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true, null)) { expectThrows(Exception.class, () -> updateSeedNodes(connection, Arrays.asList(() -> seedNode))); assertFalse(service.nodeConnected(seedNode)); assertTrue(connection.assertNoRunningConnections()); @@ -485,7 +485,7 @@ public class RemoteClusterConnectionTests extends ESTestCase { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true)) { + Arrays.asList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true, null)) { connection.addConnectedNode(seedNode); for (DiscoveryNode node : knownNodes) { final Transport.Connection transportConnection = connection.getConnection(node); @@ -528,7 +528,7 @@ public class RemoteClusterConnectionTests extends ESTestCase { CountDownLatch listenerCalled = new CountDownLatch(1); AtomicReference exceptionReference = new AtomicReference<>(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true)) { + Arrays.asList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true, null)) { ActionListener listener = ActionListener.wrap(x -> { listenerCalled.countDown(); fail("expected exception"); @@ -565,7 +565,7 @@ public class RemoteClusterConnectionTests extends ESTestCase { service.acceptIncomingRequests(); List> nodes = Collections.singletonList(() -> seedNode); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - nodes, service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true)) { + nodes, service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true, null)) { if (randomBoolean()) { updateSeedNodes(connection, nodes); } @@ -605,7 +605,7 @@ public class RemoteClusterConnectionTests extends ESTestCase { service.acceptIncomingRequests(); List> nodes = Collections.singletonList(() -> seedNode); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - nodes, service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true)) { + nodes, service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true, null)) { SearchRequest request = new SearchRequest("test-index"); Thread[] threads = new Thread[10]; for (int i = 0; i < threads.length; i++) { @@ -659,7 +659,8 @@ public class RemoteClusterConnectionTests extends ESTestCase { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Collections.singletonList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true)) { + Collections.singletonList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, + n -> true, null)) { SearchRequest request = new SearchRequest("test-index"); ClusterSearchShardsRequest searchShardsRequest = new ClusterSearchShardsRequest("test-index") @@ -769,7 +770,7 @@ public class RemoteClusterConnectionTests extends ESTestCase { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - seedNodes, service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true)) { + seedNodes, service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true, null)) { int numThreads = randomIntBetween(4, 10); Thread[] threads = new Thread[numThreads]; CyclicBarrier barrier = new CyclicBarrier(numThreads); @@ -848,7 +849,7 @@ public class RemoteClusterConnectionTests extends ESTestCase { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - seedNodes, service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true)) { + seedNodes, service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true, null)) { int numThreads = randomIntBetween(4, 10); Thread[] threads = new Thread[numThreads]; CyclicBarrier barrier = new CyclicBarrier(numThreads + 1); @@ -937,7 +938,7 @@ public class RemoteClusterConnectionTests extends ESTestCase { service.acceptIncomingRequests(); int maxNumConnections = randomIntBetween(1, 5); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - seedNodes, service, service.connectionManager(), maxNumConnections, n -> true)) { + seedNodes, service, service.connectionManager(), maxNumConnections, n -> true, null)) { // test no nodes connected RemoteConnectionInfo remoteConnectionInfo = assertSerialization(connection.getConnectionInfo()); assertNotNull(remoteConnectionInfo); @@ -1084,7 +1085,7 @@ public class RemoteClusterConnectionTests extends ESTestCase { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true)) { + Arrays.asList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true, null)) { assertFalse(service.nodeConnected(seedNode)); assertFalse(service.nodeConnected(discoverableNode)); assertTrue(connection.assertNoRunningConnections()); @@ -1133,7 +1134,7 @@ public class RemoteClusterConnectionTests extends ESTestCase { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true)) { + Arrays.asList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true, null)) { if (randomBoolean()) { updateSeedNodes(connection, Arrays.asList(() -> seedNode)); } @@ -1181,7 +1182,7 @@ public class RemoteClusterConnectionTests extends ESTestCase { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - seedNodes, service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true)) { + seedNodes, service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true, null)) { final int numGetThreads = randomIntBetween(4, 10); final Thread[] getThreads = new Thread[numGetThreads]; final int numModifyingThreads = randomIntBetween(4, 10); @@ -1271,7 +1272,7 @@ public class RemoteClusterConnectionTests extends ESTestCase { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList( () -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true)) { + Arrays.asList( () -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true, null)) { updateSeedNodes(connection, Arrays.asList(() -> seedNode)); assertTrue(service.nodeConnected(seedNode)); assertTrue(service.nodeConnected(discoverableNode)); @@ -1351,7 +1352,8 @@ public class RemoteClusterConnectionTests extends ESTestCase { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Collections.singletonList(() -> connectedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true)) { + Collections.singletonList(() -> connectedNode), service, service.getConnectionManager(), + Integer.MAX_VALUE, n -> true, null)) { connection.addConnectedNode(connectedNode); for (int i = 0; i < 10; i++) { //always a direct connection as the remote node is already connected @@ -1393,7 +1395,7 @@ public class RemoteClusterConnectionTests extends ESTestCase { return seedNode; }; try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(seedSupplier), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true)) { + Arrays.asList(seedSupplier), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true, null)) { updateSeedNodes(connection, Arrays.asList(seedSupplier)); // Closing connections leads to RemoteClusterConnection.ConnectHandler.collectRemoteNodes // being called again so we try to resolve the same seed node's host twice diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java index 94ac7e963c1..e2a0827c14d 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java @@ -31,6 +31,7 @@ import org.elasticsearch.common.settings.AbstractScopedSettings; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.test.ESTestCase; @@ -97,6 +98,7 @@ public class RemoteClusterServiceTests extends ESTestCase { assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(RemoteClusterService.REMOTE_CONNECTIONS_PER_CLUSTER)); assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(RemoteClusterService.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING)); assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(RemoteClusterService.REMOTE_NODE_ATTRIBUTE)); + assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(RemoteClusterService.REMOTE_CLUSTER_PING_SCHEDULE)); } public void testRemoteClusterSeedSetting() { @@ -194,12 +196,12 @@ public class RemoteClusterServiceTests extends ESTestCase { public void testGroupClusterIndices() throws IOException { List knownNodes = new CopyOnWriteArrayList<>(); - try (MockTransportService seedTransport = startTransport("cluster_1_node", knownNodes, Version.CURRENT); - MockTransportService otherSeedTransport = startTransport("cluster_2_node", knownNodes, Version.CURRENT)) { - DiscoveryNode seedNode = seedTransport.getLocalDiscoNode(); - DiscoveryNode otherSeedNode = otherSeedTransport.getLocalDiscoNode(); - knownNodes.add(seedTransport.getLocalDiscoNode()); - knownNodes.add(otherSeedTransport.getLocalDiscoNode()); + try (MockTransportService cluster1Transport = startTransport("cluster_1_node", knownNodes, Version.CURRENT); + MockTransportService cluster2Transport = startTransport("cluster_2_node", knownNodes, Version.CURRENT)) { + DiscoveryNode cluster1Seed = cluster1Transport.getLocalDiscoNode(); + DiscoveryNode cluster2Seed = cluster2Transport.getLocalDiscoNode(); + knownNodes.add(cluster1Transport.getLocalDiscoNode()); + knownNodes.add(cluster2Transport.getLocalDiscoNode()); Collections.shuffle(knownNodes, random()); try (MockTransportService transportService = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, @@ -207,8 +209,8 @@ public class RemoteClusterServiceTests extends ESTestCase { transportService.start(); transportService.acceptIncomingRequests(); Settings.Builder builder = Settings.builder(); - builder.putList("cluster.remote.cluster_1.seeds", seedNode.getAddress().toString()); - builder.putList("cluster.remote.cluster_2.seeds", otherSeedNode.getAddress().toString()); + builder.putList("cluster.remote.cluster_1.seeds", cluster1Seed.getAddress().toString()); + builder.putList("cluster.remote.cluster_2.seeds", cluster2Seed.getAddress().toString()); try (RemoteClusterService service = new RemoteClusterService(builder.build(), transportService)) { assertFalse(service.isCrossClusterSearchEnabled()); service.initializeRemoteClusters(); @@ -239,12 +241,12 @@ public class RemoteClusterServiceTests extends ESTestCase { public void testGroupIndices() throws IOException { List knownNodes = new CopyOnWriteArrayList<>(); - try (MockTransportService seedTransport = startTransport("cluster_1_node", knownNodes, Version.CURRENT); - MockTransportService otherSeedTransport = startTransport("cluster_2_node", knownNodes, Version.CURRENT)) { - DiscoveryNode seedNode = seedTransport.getLocalDiscoNode(); - DiscoveryNode otherSeedNode = otherSeedTransport.getLocalDiscoNode(); - knownNodes.add(seedTransport.getLocalDiscoNode()); - knownNodes.add(otherSeedTransport.getLocalDiscoNode()); + try (MockTransportService cluster1Transport = startTransport("cluster_1_node", knownNodes, Version.CURRENT); + MockTransportService cluster2Transport = startTransport("cluster_2_node", knownNodes, Version.CURRENT)) { + DiscoveryNode cluster1Seed = cluster1Transport.getLocalDiscoNode(); + DiscoveryNode cluster2Seed = cluster2Transport.getLocalDiscoNode(); + knownNodes.add(cluster1Transport.getLocalDiscoNode()); + knownNodes.add(cluster2Transport.getLocalDiscoNode()); Collections.shuffle(knownNodes, random()); try (MockTransportService transportService = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, @@ -252,8 +254,8 @@ public class RemoteClusterServiceTests extends ESTestCase { transportService.start(); transportService.acceptIncomingRequests(); Settings.Builder builder = Settings.builder(); - builder.putList("cluster.remote.cluster_1.seeds", seedNode.getAddress().toString()); - builder.putList("cluster.remote.cluster_2.seeds", otherSeedNode.getAddress().toString()); + builder.putList("cluster.remote.cluster_1.seeds", cluster1Seed.getAddress().toString()); + builder.putList("cluster.remote.cluster_2.seeds", cluster2Seed.getAddress().toString()); try (RemoteClusterService service = new RemoteClusterService(builder.build(), transportService)) { assertFalse(service.isCrossClusterSearchEnabled()); service.initializeRemoteClusters(); @@ -301,12 +303,12 @@ public class RemoteClusterServiceTests extends ESTestCase { public void testIncrementallyAddClusters() throws IOException { List knownNodes = new CopyOnWriteArrayList<>(); - try (MockTransportService seedTransport = startTransport("cluster_1_node", knownNodes, Version.CURRENT); - MockTransportService otherSeedTransport = startTransport("cluster_2_node", knownNodes, Version.CURRENT)) { - DiscoveryNode seedNode = seedTransport.getLocalDiscoNode(); - DiscoveryNode otherSeedNode = otherSeedTransport.getLocalDiscoNode(); - knownNodes.add(seedTransport.getLocalDiscoNode()); - knownNodes.add(otherSeedTransport.getLocalDiscoNode()); + try (MockTransportService cluster1Transport = startTransport("cluster_1_node", knownNodes, Version.CURRENT); + MockTransportService cluster2Transport = startTransport("cluster_2_node", knownNodes, Version.CURRENT)) { + DiscoveryNode cluster1Seed = cluster1Transport.getLocalDiscoNode(); + DiscoveryNode cluster2Seed = cluster2Transport.getLocalDiscoNode(); + knownNodes.add(cluster1Transport.getLocalDiscoNode()); + knownNodes.add(cluster2Transport.getLocalDiscoNode()); Collections.shuffle(knownNodes, random()); try (MockTransportService transportService = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, @@ -314,16 +316,16 @@ public class RemoteClusterServiceTests extends ESTestCase { transportService.start(); transportService.acceptIncomingRequests(); Settings.Builder builder = Settings.builder(); - builder.putList("cluster.remote.cluster_1.seeds", seedNode.getAddress().toString()); - builder.putList("cluster.remote.cluster_2.seeds", otherSeedNode.getAddress().toString()); + builder.putList("cluster.remote.cluster_1.seeds", cluster1Seed.getAddress().toString()); + builder.putList("cluster.remote.cluster_2.seeds", cluster2Seed.getAddress().toString()); try (RemoteClusterService service = new RemoteClusterService(Settings.EMPTY, transportService)) { assertFalse(service.isCrossClusterSearchEnabled()); service.initializeRemoteClusters(); assertFalse(service.isCrossClusterSearchEnabled()); - service.updateRemoteCluster("cluster_1", Collections.singletonList(seedNode.getAddress().toString()), null); + service.updateRemoteCluster("cluster_1", Collections.singletonList(cluster1Seed.getAddress().toString()), null); assertTrue(service.isCrossClusterSearchEnabled()); assertTrue(service.isRemoteClusterRegistered("cluster_1")); - service.updateRemoteCluster("cluster_2", Collections.singletonList(otherSeedNode.getAddress().toString()), null); + service.updateRemoteCluster("cluster_2", Collections.singletonList(cluster2Seed.getAddress().toString()), null); assertTrue(service.isCrossClusterSearchEnabled()); assertTrue(service.isRemoteClusterRegistered("cluster_1")); assertTrue(service.isRemoteClusterRegistered("cluster_2")); @@ -337,6 +339,81 @@ public class RemoteClusterServiceTests extends ESTestCase { } } + public void testDefaultPingSchedule() throws IOException { + List knownNodes = new CopyOnWriteArrayList<>(); + try (MockTransportService seedTransport = startTransport("cluster_1_node", knownNodes, Version.CURRENT)) { + DiscoveryNode seedNode = seedTransport.getLocalDiscoNode(); + knownNodes.add(seedTransport.getLocalDiscoNode()); + TimeValue pingSchedule; + Settings.Builder settingsBuilder = Settings.builder(); + settingsBuilder.putList("cluster.remote.cluster_1.seeds", seedNode.getAddress().toString()); + if (randomBoolean()) { + pingSchedule = TimeValue.timeValueSeconds(randomIntBetween(1, 10)); + settingsBuilder.put(TcpTransport.PING_SCHEDULE.getKey(), pingSchedule).build(); + } else { + pingSchedule = TimeValue.MINUS_ONE; + } + Settings settings = settingsBuilder.build(); + try (MockTransportService transportService = MockTransportService.createNewService(settings, + Version.CURRENT, threadPool, null)) { + transportService.start(); + transportService.acceptIncomingRequests(); + try (RemoteClusterService service = new RemoteClusterService(settings, transportService)) { + assertFalse(service.isCrossClusterSearchEnabled()); + service.initializeRemoteClusters(); + assertTrue(service.isCrossClusterSearchEnabled()); + service.updateRemoteCluster("cluster_1", Collections.singletonList(seedNode.getAddress().toString()), null); + assertTrue(service.isCrossClusterSearchEnabled()); + assertTrue(service.isRemoteClusterRegistered("cluster_1")); + RemoteClusterConnection remoteClusterConnection = service.getRemoteClusterConnection("cluster_1"); + assertEquals(pingSchedule, remoteClusterConnection.getConnectionManager().getPingSchedule()); + } + } + } + } + + public void testCustomPingSchedule() throws IOException { + List knownNodes = new CopyOnWriteArrayList<>(); + try (MockTransportService cluster1Transport = startTransport("cluster_1_node", knownNodes, Version.CURRENT); + MockTransportService cluster2Transport = startTransport("cluster_2_node", knownNodes, Version.CURRENT)) { + DiscoveryNode cluster1Seed = cluster1Transport.getLocalDiscoNode(); + DiscoveryNode cluster2Seed = cluster2Transport.getLocalDiscoNode(); + knownNodes.add(cluster1Transport.getLocalDiscoNode()); + knownNodes.add(cluster2Transport.getLocalDiscoNode()); + Collections.shuffle(knownNodes, random()); + Settings.Builder settingsBuilder = Settings.builder(); + if (randomBoolean()) { + settingsBuilder.put(TcpTransport.PING_SCHEDULE.getKey(), TimeValue.timeValueSeconds(randomIntBetween(1, 10))); + } + Settings transportSettings = settingsBuilder.build(); + + try (MockTransportService transportService = MockTransportService.createNewService(transportSettings, Version.CURRENT, + threadPool, null)) { + transportService.start(); + transportService.acceptIncomingRequests(); + Settings.Builder builder = Settings.builder(); + builder.putList("cluster.remote.cluster_1.seeds", cluster1Seed.getAddress().toString()); + builder.putList("cluster.remote.cluster_2.seeds", cluster2Seed.getAddress().toString()); + TimeValue pingSchedule1 = randomBoolean() ? TimeValue.MINUS_ONE : TimeValue.timeValueSeconds(randomIntBetween(1, 10)); + builder.put("cluster.remote.cluster_1.transport.ping_schedule", pingSchedule1); + TimeValue pingSchedule2 = randomBoolean() ? TimeValue.MINUS_ONE : TimeValue.timeValueSeconds(randomIntBetween(1, 10)); + builder.put("cluster.remote.cluster_2.transport.ping_schedule", pingSchedule2); + try (RemoteClusterService service = new RemoteClusterService(builder.build(), transportService)) { + assertFalse(service.isCrossClusterSearchEnabled()); + service.initializeRemoteClusters(); + assertTrue(service.isCrossClusterSearchEnabled()); + service.updateRemoteCluster("cluster_1", Collections.singletonList(cluster1Seed.getAddress().toString()), null); + assertTrue(service.isCrossClusterSearchEnabled()); + assertTrue(service.isRemoteClusterRegistered("cluster_1")); + RemoteClusterConnection remoteClusterConnection1 = service.getRemoteClusterConnection("cluster_1"); + assertEquals(pingSchedule1, remoteClusterConnection1.getConnectionManager().getPingSchedule()); + RemoteClusterConnection remoteClusterConnection2 = service.getRemoteClusterConnection("cluster_2"); + assertEquals(pingSchedule2, remoteClusterConnection2.getConnectionManager().getPingSchedule()); + } + } + } + } + public void testRemoteNodeAttribute() throws IOException, InterruptedException { final Settings settings = Settings.builder().put("cluster.remote.node.attr", "gateway").build();