Ensure remote strategy settings can be updated (#49812)

This is related to #49067. As part of this work a new sniff number of
node connections setting, a simple addresses setting, and a simple
number of sockets setting have been added. This commit ensures that
these settings are properly hooked up to support dynamic updates.
This commit is contained in:
Tim Brooks 2019-12-05 10:39:57 -07:00 committed by GitHub
parent f4b3bb7d6b
commit b281d64e89
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 82 additions and 3 deletions

View File

@ -32,7 +32,9 @@ import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.util.concurrent.CountDown;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@ -74,6 +76,7 @@ public class SimpleConnectionStrategy extends RemoteConnectionStrategy {
private final int maxNumConnections;
private final AtomicLong counter = new AtomicLong(0);
private final List<String> configuredAddresses;
private final List<Supplier<TransportAddress>> addresses;
private final AtomicReference<ClusterName> remoteClusterName = new AtomicReference<>();
private final ConnectionProfile profile;
@ -100,6 +103,7 @@ public class SimpleConnectionStrategy extends RemoteConnectionStrategy {
int maxNumConnections, List<String> configuredAddresses, List<Supplier<TransportAddress>> addresses) {
super(clusterAlias, transportService, connectionManager);
this.maxNumConnections = maxNumConnections;
this.configuredAddresses = configuredAddresses;
assert addresses.isEmpty() == false : "Cannot use simple connection strategy with no configured addresses";
this.addresses = addresses;
// TODO: Move into the ConnectionManager
@ -134,7 +138,9 @@ public class SimpleConnectionStrategy extends RemoteConnectionStrategy {
@Override
protected boolean strategyMustBeRebuilt(Settings newSettings) {
return false;
List<String> addresses = REMOTE_CLUSTER_ADDRESSES.getConcreteSettingForNamespace(clusterAlias).get(newSettings);
int numOfSockets = REMOTE_SOCKET_CONNECTIONS.getConcreteSettingForNamespace(clusterAlias).get(newSettings);
return numOfSockets != maxNumConnections || addressesChanged(configuredAddresses, addresses);
}
@Override
@ -223,4 +229,13 @@ public class SimpleConnectionStrategy extends RemoteConnectionStrategy {
private static TransportAddress resolveAddress(String address) {
return new TransportAddress(parseSeedAddress(address));
}
private boolean addressesChanged(final List<String> oldAddresses, final List<String> newAddresses) {
if (oldAddresses.size() != newAddresses.size()) {
return true;
}
Set<String> oldSeeds = new HashSet<>(oldAddresses);
Set<String> newSeeds = new HashSet<>(newAddresses);
return oldSeeds.equals(newSeeds) == false;
}
}

View File

@ -271,7 +271,9 @@ public class SniffConnectionStrategy extends RemoteConnectionStrategy {
protected boolean strategyMustBeRebuilt(Settings newSettings) {
String proxy = REMOTE_CLUSTERS_PROXY.getConcreteSettingForNamespace(clusterAlias).get(newSettings);
List<String> addresses = REMOTE_CLUSTER_SEEDS.getConcreteSettingForNamespace(clusterAlias).get(newSettings);
return seedsChanged(configuredSeedNodes, addresses) || proxyChanged(proxyAddress, proxy);
int nodeConnections = REMOTE_NODE_CONNECTIONS.getConcreteSettingForNamespace(clusterAlias).get(newSettings);
return nodeConnections != maxNumRemoteConnections || seedsChanged(configuredSeedNodes, addresses) ||
proxyChanged(proxyAddress, proxy);
}
@Override

View File

@ -22,6 +22,7 @@ package org.elasticsearch.transport;
import org.elasticsearch.Version;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.AbstractScopedSettings;
import org.elasticsearch.common.settings.ClusterSettings;
@ -303,6 +304,59 @@ public class SimpleConnectionStrategyTests extends ESTestCase {
}
}
public void testSimpleStrategyWillNeedToBeRebuiltIfNumOfSocketsOrAddressesChange() {
try (MockTransportService transport1 = startTransport("node1", Version.CURRENT);
MockTransportService transport2 = startTransport("node2", Version.CURRENT)) {
TransportAddress address1 = transport1.boundAddress().publishAddress();
TransportAddress address2 = transport2.boundAddress().publishAddress();
try (MockTransportService localService = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool)) {
localService.start();
localService.acceptIncomingRequests();
ConnectionManager connectionManager = new ConnectionManager(profile, localService.transport);
int numOfConnections = randomIntBetween(4, 8);
try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
SimpleConnectionStrategy strategy = new SimpleConnectionStrategy(clusterAlias, localService, remoteConnectionManager,
numOfConnections, addresses(address1, address2))) {
PlainActionFuture<Void> connectFuture = PlainActionFuture.newFuture();
strategy.connect(connectFuture);
connectFuture.actionGet();
assertTrue(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address1)));
assertTrue(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address2)));
assertEquals(numOfConnections, connectionManager.size());
assertTrue(strategy.assertNoRunningConnections());
Setting<?> modeSetting = RemoteConnectionStrategy.REMOTE_CONNECTION_MODE
.getConcreteSettingForNamespace("cluster-alias");
Setting<?> addressesSetting = SimpleConnectionStrategy.REMOTE_CLUSTER_ADDRESSES
.getConcreteSettingForNamespace("cluster-alias");
Setting<?> socketConnections = SimpleConnectionStrategy.REMOTE_SOCKET_CONNECTIONS
.getConcreteSettingForNamespace("cluster-alias");
Settings noChange = Settings.builder()
.put(modeSetting.getKey(), "simple")
.put(addressesSetting.getKey(), Strings.arrayToCommaDelimitedString(addresses(address1, address2).toArray()))
.put(socketConnections.getKey(), numOfConnections)
.build();
assertFalse(strategy.shouldRebuildConnection(noChange));
Settings addressesChanged = Settings.builder()
.put(modeSetting.getKey(), "simple")
.put(addressesSetting.getKey(), Strings.arrayToCommaDelimitedString(addresses(address1).toArray()))
.build();
assertTrue(strategy.shouldRebuildConnection(addressesChanged));
Settings socketsChanged = Settings.builder()
.put(modeSetting.getKey(), "simple")
.put(addressesSetting.getKey(), Strings.arrayToCommaDelimitedString(addresses(address1, address2).toArray()))
.put(socketConnections.getKey(), numOfConnections + 1)
.build();
assertTrue(strategy.shouldRebuildConnection(socketsChanged));
}
}
}
}
public void testModeSettingsCannotBeUsedWhenInDifferentMode() {
List<Tuple<Setting.AffixSetting<?>, String>> restrictedSettings = Arrays.asList(
new Tuple<>(SimpleConnectionStrategy.REMOTE_CLUSTER_ADDRESSES, "192.168.0.1:8080"),

View File

@ -487,7 +487,7 @@ public class SniffConnectionStrategyTests extends ESTestCase {
}
}
public void testSniffStrategyWillNeedToBeRebuiltIfSeedsOrProxyChange() {
public void testSniffStrategyWillNeedToBeRebuiltIfNumOfConnectionsOrSeedsOrProxyChange() {
List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT);
MockTransportService discoverableTransport = startTransport("discoverable_node", knownNodes, Version.CURRENT)) {
@ -516,9 +516,12 @@ public class SniffConnectionStrategyTests extends ESTestCase {
Setting<?> seedSetting = SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS.getConcreteSettingForNamespace("cluster-alias");
Setting<?> proxySetting = SniffConnectionStrategy.REMOTE_CLUSTERS_PROXY.getConcreteSettingForNamespace("cluster-alias");
Setting<?> numConnections = SniffConnectionStrategy.REMOTE_NODE_CONNECTIONS
.getConcreteSettingForNamespace("cluster-alias");
Settings noChange = Settings.builder()
.put(seedSetting.getKey(), Strings.arrayToCommaDelimitedString(seedNodes(seedNode).toArray()))
.put(numConnections.getKey(), 3)
.build();
assertFalse(strategy.shouldRebuildConnection(noChange));
Settings seedsChanged = Settings.builder()
@ -530,6 +533,11 @@ public class SniffConnectionStrategyTests extends ESTestCase {
.put(proxySetting.getKey(), "proxy_address:9300")
.build();
assertTrue(strategy.shouldRebuildConnection(proxyChanged));
Settings connectionsChanged = Settings.builder()
.put(seedSetting.getKey(), Strings.arrayToCommaDelimitedString(seedNodes(seedNode).toArray()))
.put(numConnections.getKey(), 4)
.build();
assertTrue(strategy.shouldRebuildConnection(connectionsChanged));
}
}
}