Rebuild remote connections on profile changes (#39146)

Currently remote compression and ping schedule settings are dynamic.
However, we do not listen for changes. This commit adds listeners for
changes to those two settings. Additionally, when those settings change
we now close existing connections and open new ones with the settings
applied.

Fixes #37201.
This commit is contained in:
Tim Brooks 2019-02-21 14:00:39 -07:00 committed by GitHub
parent 34d06471c3
commit 44df76251f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 343 additions and 134 deletions

View File

@ -27,21 +27,16 @@ more _gateway nodes_ and uses them to federate requests to the remote cluster.
[float]
[[configuring-remote-clusters]]
=== Configuring Remote Clusters
=== Configuring remote clusters
Remote clusters can be specified globally using
<<cluster-update-settings,cluster settings>> (which can be updated dynamically),
or local to individual nodes using the `elasticsearch.yml` file.
You can configure remote clusters globally by using
<<cluster-update-settings,cluster settings>>, which you can update dynamically.
Alternatively, you can configure them locally on individual nodes by using the `elasticsearch.yml` file.
If a remote cluster is configured via `elasticsearch.yml` only the nodes with
that configuration will be able to connect to the remote cluster. In other
words, functionality that relies on remote cluster requests will have to be
driven specifically from those nodes. Remote clusters set via the
<<cluster-update-settings,cluster settings API>> will be available on every node
in the cluster.
The `elasticsearch.yml` config file for a node that connects to remote clusters
needs to list the remote clusters that should be connected to, for instance:
If you specify the settings in `elasticsearch.yml` files, only the nodes with
those settings can connect to the remote cluster. In other words, functionality
that relies on remote cluster requests must be driven specifically from those
nodes. For example:
[source,yaml]
--------------------------------
@ -49,17 +44,22 @@ cluster:
remote:
cluster_one: <1>
seeds: 127.0.0.1:9300
cluster_two: <1>
transport.ping_schedule: 30s <2>
cluster_two:
seeds: 127.0.0.1:9301
transport.compress: true <3>
--------------------------------
<1> `cluster_one` and `cluster_two` are arbitrary _cluster aliases_ representing
the connection to each cluster. These names are subsequently used to distinguish
between local and remote indices.
<2> A keep-alive ping is configured for `cluster_one`.
<3> Compression is explicitly enabled for requests to `cluster_two`.
The equivalent example using the <<cluster-update-settings,cluster settings
API>> to add remote clusters to all nodes in the cluster would look like the
following:
For more information about the optional transport settings, see
<<modules-transport>>.
If you use <<cluster-update-settings,cluster settings>>, the remote clusters are available on every node in the cluster. For example:
[source,js]
--------------------------------
@ -71,12 +71,14 @@ PUT _cluster/settings
"cluster_one": {
"seeds": [
"127.0.0.1:9300"
]
],
"transport.ping_schedule": "30s"
},
"cluster_two": {
"seeds": [
"127.0.0.1:9301"
]
],
"transport.compress": true
},
"cluster_three": {
"seeds": [
@ -92,6 +94,40 @@ PUT _cluster/settings
// TEST[setup:host]
// TEST[s/127.0.0.1:9300/\${transport_host}/]
You can dynamically update the compression and ping schedule settings. However,
you must re-include seeds in the settings update request. For example:
[source,js]
--------------------------------
PUT _cluster/settings
{
"persistent": {
"cluster": {
"remote": {
"cluster_one": {
"seeds": [
"127.0.0.1:9300"
],
"transport.ping_schedule": "60s"
},
"cluster_two": {
"seeds": [
"127.0.0.1:9301"
],
"transport.compress": false
}
}
}
}
}
--------------------------------
// CONSOLE
// TEST[continued]
NOTE: When the compression or ping schedule settings change, all the existing
node connections must close and re-open, which can cause in-flight requests to
fail.
A remote cluster can be deleted from the cluster settings by setting its seeds
to `null`:

View File

@ -30,7 +30,6 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.KeyedLock;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.Closeable;
import java.io.IOException;
@ -38,8 +37,6 @@ import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@ -56,19 +53,17 @@ public class ConnectionManager implements Closeable {
private final ConcurrentMap<DiscoveryNode, Transport.Connection> connectedNodes = ConcurrentCollections.newConcurrentMap();
private final KeyedLock<String> connectionLock = new KeyedLock<>();
private final Transport transport;
private final ThreadPool threadPool;
private final ConnectionProfile defaultProfile;
private final AtomicBoolean isClosed = new AtomicBoolean(false);
private final ReadWriteLock closeLock = new ReentrantReadWriteLock();
private final DelegatingNodeConnectionListener connectionListener = new DelegatingNodeConnectionListener();
public ConnectionManager(Settings settings, Transport transport, ThreadPool threadPool) {
this(ConnectionProfile.buildDefaultConnectionProfile(settings), transport, threadPool);
public ConnectionManager(Settings settings, Transport transport) {
this(ConnectionProfile.buildDefaultConnectionProfile(settings), transport);
}
public ConnectionManager(ConnectionProfile connectionProfile, Transport transport, ThreadPool threadPool) {
public ConnectionManager(ConnectionProfile connectionProfile, Transport transport) {
this.transport = transport;
this.threadPool = threadPool;
this.defaultProfile = connectionProfile;
}
@ -185,35 +180,23 @@ public class ConnectionManager implements Closeable {
@Override
public void close() {
Transports.assertNotTransportThread("Closing ConnectionManager");
if (isClosed.compareAndSet(false, true)) {
CountDownLatch latch = new CountDownLatch(1);
// TODO: Consider moving all read/write lock (in Transport and this class) to the TransportService
threadPool.generic().execute(() -> {
closeLock.writeLock().lock();
try {
// we are holding a write lock so nobody adds to the connectedNodes / openConnections map - it's safe to first close
// all instances and then clear them maps
Iterator<Map.Entry<DiscoveryNode, Transport.Connection>> iterator = connectedNodes.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<DiscoveryNode, Transport.Connection> next = iterator.next();
try {
IOUtils.closeWhileHandlingException(next.getValue());
} finally {
iterator.remove();
}
}
} finally {
closeLock.writeLock().unlock();
latch.countDown();
}
});
closeLock.writeLock().lock();
try {
latch.await(30, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
// ignore
// we are holding a write lock so nobody adds to the connectedNodes / openConnections map - it's safe to first close
// all instances and then clear them maps
Iterator<Map.Entry<DiscoveryNode, Transport.Connection>> iterator = connectedNodes.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<DiscoveryNode, Transport.Connection> next = iterator.next();
try {
IOUtils.closeWhileHandlingException(next.getValue());
} finally {
iterator.remove();
}
}
} finally {
closeLock.writeLock().unlock();
}
}
}

View File

@ -30,6 +30,7 @@ import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.SettingUpgrader;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import java.net.InetAddress;
import java.net.InetSocketAddress;
@ -282,21 +283,38 @@ public abstract class RemoteClusterAware {
return perClusterIndices;
}
void updateRemoteCluster(String clusterAlias, List<String> addresses, String proxy) {
Boolean compress = TransportSettings.TRANSPORT_COMPRESS.get(settings);
TimeValue pingSchedule = TransportSettings.PING_SCHEDULE.get(settings);
updateRemoteCluster(clusterAlias, addresses, proxy, compress, pingSchedule);
}
void updateRemoteCluster(String clusterAlias, Settings settings) {
String proxy = REMOTE_CLUSTERS_PROXY.getConcreteSettingForNamespace(clusterAlias).get(settings);
List<String> addresses = REMOTE_CLUSTERS_SEEDS.getConcreteSettingForNamespace(clusterAlias).get(settings);
Boolean compress = RemoteClusterService.REMOTE_CLUSTER_COMPRESS.getConcreteSettingForNamespace(clusterAlias).get(settings);
TimeValue pingSchedule = RemoteClusterService.REMOTE_CLUSTER_PING_SCHEDULE
.getConcreteSettingForNamespace(clusterAlias)
.get(settings);
updateRemoteCluster(clusterAlias, addresses, proxy, compress, pingSchedule);
}
/**
* Subclasses must implement this to receive information about updated cluster aliases. If the given address list is
* empty the cluster alias is unregistered and should be removed.
*/
protected abstract void updateRemoteCluster(String clusterAlias, List<String> addresses, String proxy);
protected abstract void updateRemoteCluster(String clusterAlias, List<String> addresses, String proxy, boolean compressionEnabled,
TimeValue pingSchedule);
/**
* Registers this instance to listen to updates on the cluster settings.
*/
public void listenForUpdates(ClusterSettings clusterSettings) {
clusterSettings.addAffixUpdateConsumer(
RemoteClusterAware.REMOTE_CLUSTERS_PROXY,
RemoteClusterAware.REMOTE_CLUSTERS_SEEDS,
(key, value) -> updateRemoteCluster(key, value.v2(), value.v1()),
(namespace, value) -> {});
List<Setting.AffixSetting<?>> remoteClusterSettings = Arrays.asList(RemoteClusterAware.REMOTE_CLUSTERS_PROXY,
RemoteClusterAware.REMOTE_CLUSTERS_SEEDS, RemoteClusterService.REMOTE_CLUSTER_COMPRESS,
RemoteClusterService.REMOTE_CLUSTER_PING_SCHEDULE);
clusterSettings.addAffixGroupUpdateConsumer(remoteClusterSettings, this::updateRemoteCluster);
clusterSettings.addAffixUpdateConsumer(
RemoteClusterAware.SEARCH_REMOTE_CLUSTERS_PROXY,
RemoteClusterAware.SEARCH_REMOTE_CLUSTERS_SEEDS,

View File

@ -64,9 +64,6 @@ import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import static org.elasticsearch.transport.RemoteClusterService.REMOTE_CLUSTER_COMPRESS;
import static org.elasticsearch.transport.RemoteClusterService.REMOTE_CLUSTER_PING_SCHEDULE;
/**
* Represents a connection to a single remote cluster. In contrast to a local cluster a remote cluster is not joined such that the
* current node is part of the cluster and it won't receive cluster state updates from the remote cluster. Remote clusters are also not
@ -107,12 +104,13 @@ final class RemoteClusterConnection implements TransportConnectionListener, Clos
* @param maxNumRemoteConnections the maximum number of connections to the remote cluster
* @param nodePredicate a predicate to filter eligible remote nodes to connect to
* @param proxyAddress the proxy address
* @param connectionProfile the connection profile to use
*/
RemoteClusterConnection(Settings settings, String clusterAlias, List<Tuple<String, Supplier<DiscoveryNode>>> seedNodes,
TransportService transportService, int maxNumRemoteConnections, Predicate<DiscoveryNode> nodePredicate,
String proxyAddress) {
String proxyAddress, ConnectionProfile connectionProfile) {
this(settings, clusterAlias, seedNodes, transportService, maxNumRemoteConnections, nodePredicate, proxyAddress,
createConnectionManager(settings, clusterAlias, transportService));
createConnectionManager(connectionProfile, transportService));
}
// Public for tests to pass a StubbableConnectionManager
@ -309,13 +307,23 @@ final class RemoteClusterConnection implements TransportConnectionListener, Clos
@Override
public void close() throws IOException {
IOUtils.close(connectHandler, connectionManager);
IOUtils.close(connectHandler);
// In the ConnectionManager we wait on connections being closed.
threadPool.generic().execute(connectionManager::close);
}
public boolean isClosed() {
return connectHandler.isClosed();
}
public String getProxyAddress() {
return proxyAddress;
}
public List<Tuple<String, Supplier<DiscoveryNode>>> getSeedNodes() {
return seedNodes;
}
/**
* The connect handler manages node discovery and the actual connect to the remote cluster.
* There is at most one connect job running at any time. If such a connect job is triggered
@ -697,18 +705,8 @@ final class RemoteClusterConnection implements TransportConnectionListener, Clos
}
}
private static ConnectionManager createConnectionManager(Settings settings, String clusterAlias, TransportService transportService) {
ConnectionProfile.Builder builder = new ConnectionProfile.Builder()
.setConnectTimeout(TransportSettings.CONNECT_TIMEOUT.get(settings))
.setHandshakeTimeout(TransportSettings.CONNECT_TIMEOUT.get(settings))
.addConnections(6, TransportRequestOptions.Type.REG, TransportRequestOptions.Type.PING) // TODO make this configurable?
// we don't want this to be used for anything else but search
.addConnections(0, TransportRequestOptions.Type.BULK,
TransportRequestOptions.Type.STATE,
TransportRequestOptions.Type.RECOVERY)
.setCompressionEnabled(REMOTE_CLUSTER_COMPRESS.getConcreteSettingForNamespace(clusterAlias).get(settings))
.setPingInterval(REMOTE_CLUSTER_PING_SCHEDULE.getConcreteSettingForNamespace(clusterAlias).get(settings));
return new ConnectionManager(builder.build(), transportService.transport, transportService.threadPool);
private static ConnectionManager createConnectionManager(ConnectionProfile connectionProfile, TransportService transportService) {
return new ConnectionManager(connectionProfile, transportService.transport);
}
ConnectionManager getConnectionManager() {

View File

@ -47,6 +47,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@ -67,6 +68,8 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl
private static final Logger logger = LogManager.getLogger(RemoteClusterService.class);
private static final ActionListener<Void> noopListener = ActionListener.wrap((x) -> {}, (x) -> {});
static {
// remove search.remote.* settings in 8.0.0
assert Version.CURRENT.major < 8;
@ -185,6 +188,7 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl
private final TransportService transportService;
private final int numRemoteConnections;
private volatile Map<String, RemoteClusterConnection> remoteClusters = Collections.emptyMap();
private volatile Map<String, ConnectionProfile> remoteClusterConnectionProfiles = Collections.emptyMap();
RemoteClusterService(Settings settings, TransportService transportService) {
super(settings);
@ -212,21 +216,33 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl
List<Tuple<String, Supplier<DiscoveryNode>>> seedList = entry.getValue().v2();
String proxyAddress = entry.getValue().v1();
RemoteClusterConnection remote = this.remoteClusters.get(entry.getKey());
String clusterAlias = entry.getKey();
RemoteClusterConnection remote = this.remoteClusters.get(clusterAlias);
ConnectionProfile connectionProfile = this.remoteClusterConnectionProfiles.get(clusterAlias);
if (seedList.isEmpty()) { // with no seed nodes we just remove the connection
try {
IOUtils.close(remote);
} catch (IOException e) {
logger.warn("failed to close remote cluster connections for cluster: " + entry.getKey(), e);
logger.warn("failed to close remote cluster connections for cluster: " + clusterAlias, e);
}
remoteClusters.remove(entry.getKey());
remoteClusters.remove(clusterAlias);
continue;
}
if (remote == null) { // this is a new cluster we have to add a new representation
String clusterAlias = entry.getKey();
remote = new RemoteClusterConnection(settings, clusterAlias, seedList, transportService, numRemoteConnections,
getNodePredicate(settings), proxyAddress);
getNodePredicate(settings), proxyAddress, connectionProfile);
remoteClusters.put(clusterAlias, remote);
} else if (connectionProfileChanged(remote.getConnectionManager().getConnectionProfile(), connectionProfile)) {
// New ConnectionProfile. Must tear down existing connection
try {
IOUtils.close(remote);
} catch (IOException e) {
logger.warn("failed to close remote cluster connections for cluster: " + clusterAlias, e);
}
remoteClusters.remove(clusterAlias);
remote = new RemoteClusterConnection(settings, clusterAlias, seedList, transportService, numRemoteConnections,
getNodePredicate(settings), proxyAddress, connectionProfile);
remoteClusters.put(clusterAlias, remote);
}
@ -243,7 +259,7 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl
connectionListener.onFailure(exception);
}
if (finalRemote.isClosed() == false) {
logger.warn("failed to update seed list for cluster: " + entry.getKey(), exception);
logger.warn("failed to update seed list for cluster: " + clusterAlias, exception);
}
}));
}
@ -361,19 +377,36 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl
}
@Override
protected void updateRemoteCluster(String clusterAlias, List<String> addresses, String proxyAddress) {
updateRemoteCluster(clusterAlias, addresses, proxyAddress, ActionListener.wrap((x) -> {}, (x) -> {}));
protected void updateRemoteCluster(String clusterAlias, List<String> addresses, String proxyAddress, boolean compressionEnabled,
TimeValue pingSchedule) {
if (LOCAL_CLUSTER_GROUP_KEY.equals(clusterAlias)) {
throw new IllegalArgumentException("remote clusters must not have the empty string as its key");
}
ConnectionProfile oldProfile = remoteClusterConnectionProfiles.get(clusterAlias);
ConnectionProfile newProfile;
if (oldProfile != null) {
ConnectionProfile.Builder builder = new ConnectionProfile.Builder(oldProfile);
builder.setCompressionEnabled(compressionEnabled);
builder.setPingInterval(pingSchedule);
newProfile = builder.build();
} else {
ConnectionProfile.Builder builder = new ConnectionProfile.Builder(buildConnectionProfileFromSettings(clusterAlias));
builder.setCompressionEnabled(compressionEnabled);
builder.setPingInterval(pingSchedule);
newProfile = builder.build();
}
updateRemoteCluster(clusterAlias, addresses, proxyAddress, newProfile, noopListener);
}
void updateRemoteCluster(
final String clusterAlias,
final List<String> addresses,
final String proxyAddress,
final ActionListener<Void> connectionListener) {
void updateRemoteCluster(final String clusterAlias, final List<String> addresses, final String proxyAddress,
final ConnectionProfile connectionProfile, final ActionListener<Void> connectionListener) {
HashMap<String, ConnectionProfile> connectionProfiles = new HashMap<>(remoteClusterConnectionProfiles);
connectionProfiles.put(clusterAlias, connectionProfile);
this.remoteClusterConnectionProfiles = Collections.unmodifiableMap(connectionProfiles);
final List<Tuple<String, Supplier<DiscoveryNode>>> nodes =
addresses.stream().<Tuple<String, Supplier<DiscoveryNode>>>map(address -> Tuple.tuple(address, () ->
buildSeedNode(clusterAlias, address, Strings.hasLength(proxyAddress)))
).collect(Collectors.toList());
addresses.stream().<Tuple<String, Supplier<DiscoveryNode>>>map(address -> Tuple.tuple(address, () ->
buildSeedNode(clusterAlias, address, Strings.hasLength(proxyAddress)))
).collect(Collectors.toList());
updateRemoteClusters(Collections.singletonMap(clusterAlias, new Tuple<>(proxyAddress, nodes)), connectionListener);
}
@ -386,6 +419,7 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl
final PlainActionFuture<Void> future = new PlainActionFuture<>();
Map<String, Tuple<String, List<Tuple<String, Supplier<DiscoveryNode>>>>> seeds =
RemoteClusterAware.buildRemoteClustersDynamicConfig(settings);
initializeConnectionProfiles(seeds.keySet());
updateRemoteClusters(seeds, future);
try {
future.get(timeValue.millis(), TimeUnit.MILLISECONDS);
@ -398,6 +432,32 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl
}
}
private synchronized void initializeConnectionProfiles(Set<String> remoteClusters) {
Map<String, ConnectionProfile> connectionProfiles = new HashMap<>(remoteClusters.size());
for (String clusterName : remoteClusters) {
connectionProfiles.put(clusterName, buildConnectionProfileFromSettings(clusterName));
}
this.remoteClusterConnectionProfiles = Collections.unmodifiableMap(connectionProfiles);
}
private ConnectionProfile buildConnectionProfileFromSettings(String clusterName) {
return buildConnectionProfileFromSettings(settings, clusterName);
}
static ConnectionProfile buildConnectionProfileFromSettings(Settings settings, String clusterName) {
return new ConnectionProfile.Builder()
.setConnectTimeout(TransportSettings.CONNECT_TIMEOUT.get(settings))
.setHandshakeTimeout(TransportSettings.CONNECT_TIMEOUT.get(settings))
.addConnections(6, TransportRequestOptions.Type.REG, TransportRequestOptions.Type.PING) // TODO make this configurable?
// we don't want this to be used for anything else but search
.addConnections(0, TransportRequestOptions.Type.BULK,
TransportRequestOptions.Type.STATE,
TransportRequestOptions.Type.RECOVERY)
.setCompressionEnabled(REMOTE_CLUSTER_COMPRESS.getConcreteSettingForNamespace(clusterName).get(settings))
.setPingInterval(REMOTE_CLUSTER_PING_SCHEDULE.getConcreteSettingForNamespace(clusterName).get(settings))
.build();
}
@Override
public void close() throws IOException {
IOUtils.close(remoteClusters.values());
@ -407,6 +467,11 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl
return remoteClusters.values().stream().map(RemoteClusterConnection::getConnectionInfo);
}
private boolean connectionProfileChanged(ConnectionProfile oldProfile, ConnectionProfile newProfile) {
return Objects.equals(oldProfile.getCompressionEnabled(), newProfile.getCompressionEnabled()) == false
|| Objects.equals(oldProfile.getPingInterval(), newProfile.getPingInterval()) == false;
}
/**
* Collects all nodes of the given clusters and returns / passes a (clusterAlias, nodeId) to {@link DiscoveryNode}
* function on success.

View File

@ -149,7 +149,7 @@ public class TransportService extends AbstractLifecycleComponent implements Tran
Function<BoundTransportAddress, DiscoveryNode> localNodeFactory, @Nullable ClusterSettings clusterSettings,
Set<String> taskHeaders) {
this(settings, transport, threadPool, transportInterceptor, localNodeFactory, clusterSettings, taskHeaders,
new ConnectionManager(settings, transport, threadPool));
new ConnectionManager(settings, transport));
}
public TransportService(Settings settings, Transport transport, ThreadPool threadPool, TransportInterceptor transportInterceptor,

View File

@ -211,7 +211,7 @@ public class PeerFinderTests extends ESTestCase {
localNode = newDiscoveryNode("local-node");
ConnectionManager innerConnectionManager
= new ConnectionManager(settings, capturingTransport, deterministicTaskQueue.getThreadPool());
= new ConnectionManager(settings, capturingTransport);
StubbableConnectionManager connectionManager
= new StubbableConnectionManager(innerConnectionManager, settings, capturingTransport, deterministicTaskQueue.getThreadPool());
connectionManager.setDefaultNodeConnectedBehavior((cm, discoveryNode) -> {

View File

@ -55,7 +55,7 @@ public class ConnectionManagerTests extends ESTestCase {
.build();
threadPool = new ThreadPool(settings);
transport = mock(Transport.class);
connectionManager = new ConnectionManager(settings, transport, threadPool);
connectionManager = new ConnectionManager(settings, transport);
TimeValue oneSecond = new TimeValue(1000);
TimeValue oneMinute = TimeValue.timeValueMinutes(1);
connectionProfile = ConnectionProfile.buildSingleChannelProfile(TransportRequestOptions.Type.REG, oneSecond, oneSecond,

View File

@ -104,6 +104,7 @@ import static org.hamcrest.Matchers.startsWith;
public class RemoteClusterConnectionTests extends ESTestCase {
private final ThreadPool threadPool = new TestThreadPool(getClass().getName());
private final ConnectionProfile profile = RemoteClusterService.buildConnectionProfileFromSettings(Settings.EMPTY, "cluster");
@Override
public void tearDown() throws Exception {
@ -191,7 +192,8 @@ public class RemoteClusterConnectionTests extends ESTestCase {
service.start();
service.acceptIncomingRequests();
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
Arrays.asList(Tuple.tuple(seedNode.toString(), () -> seedNode)), service, Integer.MAX_VALUE, n -> true, null)) {
Arrays.asList(Tuple.tuple(seedNode.toString(), () -> seedNode)), service, Integer.MAX_VALUE, n -> true, null,
profile)) {
ConnectionManager connectionManager = connection.getConnectionManager();
updateSeedNodes(connection, seedNodes(seedNode));
assertTrue(connectionManager.nodeConnected(seedNode));
@ -233,7 +235,8 @@ public class RemoteClusterConnectionTests extends ESTestCase {
service.start();
service.acceptIncomingRequests();
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
Arrays.asList(Tuple.tuple(seedNode.toString(), () -> seedNode)), service, Integer.MAX_VALUE, n -> true, null)) {
Arrays.asList(Tuple.tuple(seedNode.toString(), () -> seedNode)), service, Integer.MAX_VALUE, n -> true, null,
profile)) {
ConnectionManager connectionManager = connection.getConnectionManager();
updateSeedNodes(connection, seedNodes(seedNode));
assertTrue(connectionManager.nodeConnected(seedNode));
@ -286,7 +289,8 @@ public class RemoteClusterConnectionTests extends ESTestCase {
service.start();
service.acceptIncomingRequests();
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
Arrays.asList(Tuple.tuple(seedNode.toString(), () -> seedNode)), service, Integer.MAX_VALUE, n -> true, null)) {
Arrays.asList(Tuple.tuple(seedNode.toString(), () -> seedNode)), service, Integer.MAX_VALUE, n -> true, null,
profile)) {
ConnectionManager connectionManager = connection.getConnectionManager();
updateSeedNodes(connection, seedNodes(seedNode));
assertTrue(connectionManager.nodeConnected(seedNode));
@ -318,7 +322,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
service.start();
service.acceptIncomingRequests();
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
seedNodes, service, Integer.MAX_VALUE, n -> true, null)) {
seedNodes, service, Integer.MAX_VALUE, n -> true, null, profile)) {
ConnectionManager connectionManager = connection.getConnectionManager();
updateSeedNodes(connection, seedNodes);
assertTrue(connectionManager.nodeConnected(seedNode));
@ -346,7 +350,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
service.start();
service.acceptIncomingRequests();
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
seedNodes(seedNode), service, Integer.MAX_VALUE, n -> true, null)) {
seedNodes(seedNode), service, Integer.MAX_VALUE, n -> true, null, profile)) {
ConnectionManager connectionManager = connection.getConnectionManager();
updateSeedNodes(connection, seedNodes(seedNode));
assertTrue(connectionManager.nodeConnected(seedNode));
@ -396,7 +400,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
service.start();
service.acceptIncomingRequests();
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
seedNodes(seedNode), service, Integer.MAX_VALUE, n -> n.equals(rejectedNode) == false, null)) {
seedNodes(seedNode), service, Integer.MAX_VALUE, n -> n.equals(rejectedNode) == false, null, profile)) {
ConnectionManager connectionManager = connection.getConnectionManager();
updateSeedNodes(connection, seedNodes(seedNode));
if (rejectedNode.equals(seedNode)) {
@ -461,7 +465,8 @@ public class RemoteClusterConnectionTests extends ESTestCase {
service.start();
service.acceptIncomingRequests();
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
Arrays.asList(Tuple.tuple(seedNode.toString(), () -> seedNode)), service, Integer.MAX_VALUE, n -> true, null)) {
Arrays.asList(Tuple.tuple(seedNode.toString(), () -> seedNode)), service, Integer.MAX_VALUE, n -> true, null,
profile)) {
ConnectionManager connectionManager = connection.getConnectionManager();
expectThrows(
Exception.class,
@ -502,7 +507,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
}
};
ConnectionManager delegate = new ConnectionManager(Settings.EMPTY, service.transport, threadPool);
ConnectionManager delegate = new ConnectionManager(Settings.EMPTY, service.transport);
StubbableConnectionManager connectionManager = new StubbableConnectionManager(delegate, Settings.EMPTY, service.transport,
threadPool);
@ -559,7 +564,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
CountDownLatch listenerCalled = new CountDownLatch(1);
AtomicReference<Exception> exceptionReference = new AtomicReference<>();
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
seedNodes(seedNode), service, Integer.MAX_VALUE, n -> true, null)) {
seedNodes(seedNode), service, Integer.MAX_VALUE, n -> true, null, profile)) {
ActionListener<Void> listener = ActionListener.wrap(x -> {
listenerCalled.countDown();
fail("expected exception");
@ -614,7 +619,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
service.start();
service.acceptIncomingRequests();
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
seedNodes, service, Integer.MAX_VALUE, n -> true, null)) {
seedNodes, service, Integer.MAX_VALUE, n -> true, null, profile)) {
ConnectionManager connectionManager = connection.getConnectionManager();
int numThreads = randomIntBetween(4, 10);
Thread[] threads = new Thread[numThreads];
@ -694,7 +699,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
service.start();
service.acceptIncomingRequests();
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
seedNodes, service, Integer.MAX_VALUE, n -> true, null)) {
seedNodes, service, Integer.MAX_VALUE, n -> true, null, profile)) {
int numThreads = randomIntBetween(4, 10);
Thread[] threads = new Thread[numThreads];
CyclicBarrier barrier = new CyclicBarrier(numThreads + 1);
@ -782,7 +787,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
service.acceptIncomingRequests();
int maxNumConnections = randomIntBetween(1, 5);
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
seedNodes, service, maxNumConnections, n -> true, null)) {
seedNodes, service, maxNumConnections, n -> true, null, profile)) {
// test no nodes connected
RemoteConnectionInfo remoteConnectionInfo = assertSerialization(connection.getConnectionInfo());
assertNotNull(remoteConnectionInfo);
@ -914,7 +919,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
service.start();
service.acceptIncomingRequests();
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
seedNodes(seedNode), service, Integer.MAX_VALUE, n -> true, null)) {
seedNodes(seedNode), service, Integer.MAX_VALUE, n -> true, null, profile)) {
ConnectionManager connectionManager = connection.getConnectionManager();
assertFalse(connectionManager.nodeConnected(seedNode));
assertFalse(connectionManager.nodeConnected(discoverableNode));
@ -964,7 +969,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
service.start();
service.acceptIncomingRequests();
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
seedNodes(seedNode), service, Integer.MAX_VALUE, n -> true, null)) {
seedNodes(seedNode), service, Integer.MAX_VALUE, n -> true, null, profile)) {
if (randomBoolean()) {
updateSeedNodes(connection, seedNodes(seedNode));
}
@ -1012,7 +1017,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
service.start();
service.acceptIncomingRequests();
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
seedNodes, service, Integer.MAX_VALUE, n -> true, null)) {
seedNodes, service, Integer.MAX_VALUE, n -> true, null, profile)) {
final int numGetThreads = randomIntBetween(4, 10);
final Thread[] getThreads = new Thread[numGetThreads];
final int numModifyingThreads = randomIntBetween(4, 10);
@ -1100,7 +1105,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
service.start();
service.acceptIncomingRequests();
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
seedNodes(seedNode), service, Integer.MAX_VALUE, n -> true, null)) {
seedNodes(seedNode), service, Integer.MAX_VALUE, n -> true, null, profile)) {
ConnectionManager connectionManager = connection.getConnectionManager();
updateSeedNodes(connection, seedNodes(seedNode));
assertTrue(connectionManager.nodeConnected(seedNode));
@ -1156,7 +1161,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
}
};
ConnectionManager delegate = new ConnectionManager(Settings.EMPTY, service.transport, threadPool);
ConnectionManager delegate = new ConnectionManager(Settings.EMPTY, service.transport);
StubbableConnectionManager connectionManager = new StubbableConnectionManager(delegate, Settings.EMPTY, service.transport,
threadPool);
@ -1214,7 +1219,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
return seedNode;
});
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
Arrays.asList(seedSupplier), service, Integer.MAX_VALUE, n -> true, null)) {
Arrays.asList(seedSupplier), service, Integer.MAX_VALUE, n -> true, null, profile)) {
updateSeedNodes(connection, Arrays.asList(seedSupplier));
// Closing connections leads to RemoteClusterConnection.ConnectHandler.collectRemoteNodes
// being called again so we try to resolve the same seed node's host twice
@ -1246,7 +1251,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
RemoteClusterAware.buildSeedNode("some-remote-cluster", "node_0:" + randomIntBetween(1, 10000), true));
assertEquals("node_0", seedSupplier.v2().get().getAttributes().get("server_name"));
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
Arrays.asList(seedSupplier), service, Integer.MAX_VALUE, n -> true, proxyAddress)) {
Arrays.asList(seedSupplier), service, Integer.MAX_VALUE, n -> true, proxyAddress, profile)) {
updateSeedNodes(connection, Arrays.asList(seedSupplier), proxyAddress);
assertEquals(2, connection.getNumNodesConnected());
assertNotNull(connection.getConnection(discoverableTransport.getLocalDiscoNode()));

View File

@ -400,11 +400,7 @@ public class RemoteClusterServiceTests extends ESTestCase {
TimeValue.timeValueSeconds(randomIntBetween(1, 10));
builder.put("cluster.remote.cluster_2.transport.ping_schedule", pingSchedule2);
try (RemoteClusterService service = new RemoteClusterService(builder.build(), transportService)) {
assertFalse(service.isCrossClusterSearchEnabled());
service.initializeRemoteClusters();
assertTrue(service.isCrossClusterSearchEnabled());
service.updateRemoteCluster("cluster_1", Collections.singletonList(cluster1Seed.getAddress().toString()), null);
assertTrue(service.isCrossClusterSearchEnabled());
assertTrue(service.isRemoteClusterRegistered("cluster_1"));
RemoteClusterConnection remoteClusterConnection1 = service.getRemoteClusterConnection("cluster_1");
assertEquals(pingSchedule1, remoteClusterConnection1.getConnectionManager().getConnectionProfile().getPingInterval());
@ -415,6 +411,40 @@ public class RemoteClusterServiceTests extends ESTestCase {
}
}
public void testChangeSettings() throws Exception {
List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
try (MockTransportService cluster1Transport = startTransport("cluster_1_node", knownNodes, Version.CURRENT)) {
DiscoveryNode cluster1Seed = cluster1Transport.getLocalDiscoNode();
knownNodes.add(cluster1Transport.getLocalDiscoNode());
Collections.shuffle(knownNodes, random());
try (MockTransportService transportService = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT,
threadPool, null)) {
transportService.start();
transportService.acceptIncomingRequests();
Settings.Builder builder = Settings.builder();
builder.putList("cluster.remote.cluster_1.seeds", cluster1Seed.getAddress().toString());
try (RemoteClusterService service = new RemoteClusterService(builder.build(), transportService)) {
service.initializeRemoteClusters();
RemoteClusterConnection remoteClusterConnection = service.getRemoteClusterConnection("cluster_1");
Settings.Builder settingsChange = Settings.builder();
TimeValue pingSchedule = TimeValue.timeValueSeconds(randomIntBetween(6, 8));
settingsChange.put("cluster.remote.cluster_1.transport.ping_schedule", pingSchedule);
boolean compressionEnabled = true;
settingsChange.put("cluster.remote.cluster_1.transport.compress", compressionEnabled);
settingsChange.putList("cluster.remote.cluster_1.seeds", cluster1Seed.getAddress().toString());
service.updateRemoteCluster("cluster_1", settingsChange.build());
assertBusy(remoteClusterConnection::isClosed);
remoteClusterConnection = service.getRemoteClusterConnection("cluster_1");
ConnectionProfile connectionProfile = remoteClusterConnection.getConnectionManager().getConnectionProfile();
assertEquals(pingSchedule, connectionProfile.getPingInterval());
assertEquals(compressionEnabled, connectionProfile.getCompressionEnabled());
}
}
}
}
public void testRemoteNodeAttribute() throws IOException, InterruptedException {
final Settings settings =
Settings.builder().put("cluster.remote.node.attr", "gateway").build();
@ -460,14 +490,14 @@ public class RemoteClusterServiceTests extends ESTestCase {
service.updateRemoteCluster(
"cluster_1",
Arrays.asList(c1N1Node.getAddress().toString(), c1N2Node.getAddress().toString()), null,
connectionListener(firstLatch));
genericProfile("cluster_1"), connectionListener(firstLatch));
firstLatch.await();
final CountDownLatch secondLatch = new CountDownLatch(1);
service.updateRemoteCluster(
"cluster_2",
Arrays.asList(c2N1Node.getAddress().toString(), c2N2Node.getAddress().toString()), null,
connectionListener(secondLatch));
genericProfile("cluster_2"), connectionListener(secondLatch));
secondLatch.await();
assertTrue(service.isCrossClusterSearchEnabled());
@ -525,14 +555,14 @@ public class RemoteClusterServiceTests extends ESTestCase {
service.updateRemoteCluster(
"cluster_1",
Arrays.asList(c1N1Node.getAddress().toString(), c1N2Node.getAddress().toString()), null,
connectionListener(firstLatch));
genericProfile("cluster_1"), connectionListener(firstLatch));
firstLatch.await();
final CountDownLatch secondLatch = new CountDownLatch(1);
service.updateRemoteCluster(
"cluster_2",
Arrays.asList(c2N1Node.getAddress().toString(), c2N2Node.getAddress().toString()), null,
connectionListener(secondLatch));
genericProfile("cluster_2"), connectionListener(secondLatch));
secondLatch.await();
assertTrue(service.isCrossClusterSearchEnabled());
@ -595,17 +625,17 @@ public class RemoteClusterServiceTests extends ESTestCase {
assertFalse(service.isCrossClusterSearchEnabled());
final CountDownLatch firstLatch = new CountDownLatch(1);
service.updateRemoteCluster(
"cluster_1",
service.updateRemoteCluster("cluster_1",
Arrays.asList(c1N1Node.getAddress().toString(), c1N2Node.getAddress().toString()), null,
connectionListener(firstLatch));
genericProfile("cluster_1"), connectionListener(firstLatch));
firstLatch.await();
final CountDownLatch secondLatch = new CountDownLatch(1);
service.updateRemoteCluster(
"cluster_2",
Arrays.asList(c2N1Node.getAddress().toString(), c2N2Node.getAddress().toString()), null,
connectionListener(secondLatch));
genericProfile("cluster_2"), connectionListener(secondLatch));
secondLatch.await();
CountDownLatch latch = new CountDownLatch(1);
service.collectNodes(new HashSet<>(Arrays.asList("cluster_1", "cluster_2")),
@ -911,7 +941,7 @@ public class RemoteClusterServiceTests extends ESTestCase {
exceptionAtomicReference.set(x);
latch.countDown();
});
service.updateRemoteCluster(clusterAlias, addresses, proxyAddress, listener);
service.updateRemoteCluster(clusterAlias, addresses, proxyAddress, genericProfile(clusterAlias), listener);
latch.await();
if (exceptionAtomicReference.get() != null) {
throw exceptionAtomicReference.get();
@ -953,4 +983,8 @@ public class RemoteClusterServiceTests extends ESTestCase {
}
}
}
private static ConnectionProfile genericProfile(String clusterName) {
return RemoteClusterService.buildConnectionProfileFromSettings(Settings.EMPTY, clusterName);
}
}

View File

@ -80,7 +80,7 @@ public class MockTransport implements Transport, LifecycleComponent {
public TransportService createTransportService(Settings settings, ThreadPool threadPool, TransportInterceptor interceptor,
Function<BoundTransportAddress, DiscoveryNode> localNodeFactory,
@Nullable ClusterSettings clusterSettings, Set<String> taskHeaders) {
StubbableConnectionManager connectionManager = new StubbableConnectionManager(new ConnectionManager(settings, this, threadPool),
StubbableConnectionManager connectionManager = new StubbableConnectionManager(new ConnectionManager(settings, this),
settings, this, threadPool);
connectionManager.setDefaultNodeConnectedBehavior((cm, discoveryNode) -> nodeConnected(discoveryNode));
connectionManager.setDefaultGetConnectionBehavior((cm, discoveryNode) -> createConnection(discoveryNode));

View File

@ -157,7 +157,7 @@ public final class MockTransportService extends TransportService {
Function<BoundTransportAddress, DiscoveryNode> localNodeFactory,
@Nullable ClusterSettings clusterSettings, Set<String> taskHeaders) {
super(settings, transport, threadPool, interceptor, localNodeFactory, clusterSettings, taskHeaders,
new StubbableConnectionManager(new ConnectionManager(settings, transport, threadPool), settings, transport, threadPool));
new StubbableConnectionManager(new ConnectionManager(settings, transport), settings, transport, threadPool));
this.original = transport.getDelegate();
}

View File

@ -42,7 +42,7 @@ public class StubbableConnectionManager extends ConnectionManager {
private volatile NodeConnectedBehavior defaultNodeConnectedBehavior = ConnectionManager::nodeConnected;
public StubbableConnectionManager(ConnectionManager delegate, Settings settings, Transport transport, ThreadPool threadPool) {
super(settings, transport, threadPool);
super(settings, transport);
this.delegate = delegate;
this.getConnectionBehaviors = new ConcurrentHashMap<>();
this.nodeConnectedBehaviors = new ConcurrentHashMap<>();

View File

@ -12,6 +12,7 @@ import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.transport.RemoteClusterAware;
import org.elasticsearch.xpack.ccr.action.repositories.DeleteInternalCcrRepositoryAction;
import org.elasticsearch.xpack.ccr.action.repositories.DeleteInternalCcrRepositoryRequest;
@ -75,7 +76,8 @@ class CcrRepositoryManager extends AbstractLifecycleComponent {
}
@Override
protected void updateRemoteCluster(String clusterAlias, List<String> addresses, String proxy) {
protected void updateRemoteCluster(String clusterAlias, List<String> addresses, String proxy, boolean compressionEnabled,
TimeValue pingSchedule) {
String repositoryName = CcrRepository.NAME_PREFIX + clusterAlias;
if (addresses.isEmpty()) {
deleteRepository(repositoryName);

View File

@ -50,6 +50,7 @@ import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.CheckedRunnable;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
@ -68,6 +69,8 @@ import org.elasticsearch.snapshots.SnapshotRestoreException;
import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.test.BackgroundIndexer;
import org.elasticsearch.transport.NoSuchRemoteClusterException;
import org.elasticsearch.transport.RemoteClusterService;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.CcrIntegTestCase;
import org.elasticsearch.xpack.ccr.action.ShardFollowTask;
import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus;
@ -1122,6 +1125,69 @@ public class IndexFollowingIT extends CcrIntegTestCase {
}
}
public void testUpdateRemoteConfigsDuringFollowing() throws Exception {
final int numberOfPrimaryShards = randomIntBetween(1, 3);
int numberOfReplicas = between(0, 1);
final String leaderIndexSettings = getIndexSettings(numberOfPrimaryShards, numberOfReplicas,
singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
assertAcked(leaderClient().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON));
ensureLeaderYellow("index1");
final int firstBatchNumDocs = randomIntBetween(200, 800);
logger.info("Executing put follow");
final PutFollowAction.Request followRequest = putFollow("index1", "index2");
PutFollowAction.Response response = followerClient().execute(PutFollowAction.INSTANCE, followRequest).get();
assertTrue(response.isFollowIndexCreated());
assertTrue(response.isFollowIndexShardsAcked());
assertTrue(response.isIndexFollowingStarted());
logger.info("Indexing [{}] docs while updateing remote config", firstBatchNumDocs);
try (BackgroundIndexer indexer = new BackgroundIndexer("index1", "_doc", leaderClient(), firstBatchNumDocs,
randomIntBetween(1, 5))) {
ClusterUpdateSettingsRequest settingsRequest = new ClusterUpdateSettingsRequest();
String address = getLeaderCluster().getDataNodeInstance(TransportService.class).boundAddress().publishAddress().toString();
Setting<Boolean> compress = RemoteClusterService.REMOTE_CLUSTER_COMPRESS.getConcreteSettingForNamespace("leader_cluster");
Setting<List<String>> seeds = RemoteClusterService.REMOTE_CLUSTERS_SEEDS.getConcreteSettingForNamespace("leader_cluster");
settingsRequest.persistentSettings(Settings.builder().put(compress.getKey(), true).put(seeds.getKey(), address));
assertAcked(followerClient().admin().cluster().updateSettings(settingsRequest).actionGet());
waitForDocs(firstBatchNumDocs, indexer);
indexer.assertNoFailures();
final Map<ShardId, Long> firstBatchNumDocsPerShard = new HashMap<>();
final ShardStats[] firstBatchShardStats =
leaderClient().admin().indices().prepareStats("index1").get().getIndex("index1").getShards();
for (final ShardStats shardStats : firstBatchShardStats) {
if (shardStats.getShardRouting().primary()) {
long value = shardStats.getStats().getIndexing().getTotal().getIndexCount() - 1;
firstBatchNumDocsPerShard.put(shardStats.getShardRouting().shardId(), value);
}
}
assertBusy(assertTask(numberOfPrimaryShards, firstBatchNumDocsPerShard));
for (String docId : indexer.getIds()) {
assertBusy(() -> {
final GetResponse getResponse = followerClient().prepareGet("index2", "_doc", docId).get();
assertTrue("Doc with id [" + docId + "] is missing", getResponse.isExists());
});
}
assertMaxSeqNoOfUpdatesIsTransferred(resolveLeaderIndex("index1"), resolveFollowerIndex("index2"), numberOfPrimaryShards);
} finally {
ClusterUpdateSettingsRequest settingsRequest = new ClusterUpdateSettingsRequest();
String address = getLeaderCluster().getDataNodeInstance(TransportService.class).boundAddress().publishAddress().toString();
Setting<Boolean> compress = RemoteClusterService.REMOTE_CLUSTER_COMPRESS.getConcreteSettingForNamespace("leader_cluster");
Setting<List<String>> seeds = RemoteClusterService.REMOTE_CLUSTERS_SEEDS.getConcreteSettingForNamespace("leader_cluster");
settingsRequest.persistentSettings(Settings.builder().put(compress.getKey(), compress.getDefault(Settings.EMPTY))
.put(seeds.getKey(), address));
assertAcked(followerClient().admin().cluster().updateSettings(settingsRequest).actionGet());
}
}
private long getFollowTaskSettingsVersion(String followerIndex) {
long settingsVersion = -1L;
for (ShardFollowNodeTaskStatus status : getFollowTaskStatuses(followerIndex)) {

View File

@ -24,6 +24,7 @@ import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.protocol.xpack.graph.GraphExploreRequest;
import org.elasticsearch.transport.RemoteClusterAware;
@ -438,7 +439,8 @@ class IndicesAndAliasesResolver {
}
@Override
protected void updateRemoteCluster(String clusterAlias, List<String> addresses, String proxyAddress) {
protected void updateRemoteCluster(String clusterAlias, List<String> addresses, String proxyAddress, boolean compressionEnabled,
TimeValue pingSchedule) {
if (addresses.isEmpty()) {
clusters.remove(clusterAlias);
} else {