Cut over to use affix setting for remote cluster configuration

Instead of `search.remote.seeds.${clustername}` we now specify the seeds as:
`search.remote.${clustername}.seeds` which is a real list setting compared to an unvalidated
group setting before.
This commit is contained in:
Simon Willnauer 2017-01-11 12:36:00 +01:00
parent 6d2d878068
commit 4c61f1d75d
10 changed files with 207 additions and 93 deletions

View File

@ -236,6 +236,10 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
connectHandler.close();
}
public boolean isClosed() {
return connectHandler.isClosed();
}
/**
* The connect handler manages node discovery and the actual connect to the remote cluster.
* There is at most one connect job running at any time. If such a connect job is triggered
@ -350,6 +354,9 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
void collectRemoteNodes(Iterator<DiscoveryNode> seedNodes,
final TransportService transportService, ActionListener<Void> listener) {
if (Thread.currentThread().isInterrupted()) {
listener.onFailure(new InterruptedException("remote connect thread got interrupted"));
}
try {
if (seedNodes.hasNext()) {
cancellableThreads.executeIO(() -> {
@ -409,6 +416,10 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
}
}
final boolean isClosed() {
return closed.get();
}
/* This class handles the _state response from the remote cluster when sniffing nodes to connect to */
private class SniffClusterStateResponseHandler implements TransportResponseHandler<ClusterStateResponse> {

View File

@ -59,6 +59,8 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* Basic service for accessing remote clusters via gateway nodes
@ -68,11 +70,9 @@ public final class RemoteClusterService extends AbstractComponent implements Clo
/**
* A list of initial seed nodes to discover eligible nodes from the remote cluster
*/
//TODO this should be an affix settings?
public static final Setting<Settings> REMOTE_CLUSTERS_SEEDS = Setting.groupSetting("search.remote.seeds.",
RemoteClusterService::validateRemoteClustersSeeds,
Setting.Property.NodeScope,
Setting.Property.Dynamic);
public static final Setting.AffixSetting<List<InetSocketAddress>> REMOTE_CLUSTERS_SEEDS = Setting.affixKeySetting("search.remote.",
"seeds", (key) -> Setting.listSetting(key, Collections.emptyList(), RemoteClusterService::parseSeedAddress,
Setting.Property.NodeScope, Setting.Property.Dynamic));
/**
* The maximum number of connections that will be established to a remote cluster. For instance if there is only a single
* seed node, other nodes will be discovered up to the given number of nodes in this setting. The default is 3.
@ -109,12 +109,11 @@ public final class RemoteClusterService extends AbstractComponent implements Clo
/**
* This method updates the list of remote clusters. It's intended to be used as an update consumer on the settings infrastructure
* @param seedSettings the group settings returned from {@link #REMOTE_CLUSTERS_SEEDS}
* @param seeds a cluster alias to discovery node mapping representing the remote clusters seeds nodes
* @param connectionListener a listener invoked once every configured cluster has been connected to
*/
void updateRemoteClusters(Settings seedSettings, ActionListener<Void> connectionListener) {
private synchronized void updateRemoteClusters(Map<String, List<DiscoveryNode>> seeds, ActionListener<Void> connectionListener) {
Map<String, RemoteClusterConnection> remoteClusters = new HashMap<>();
Map<String, List<DiscoveryNode>> seeds = buildRemoteClustersSeeds(seedSettings);
if (seeds.isEmpty()) {
connectionListener.onResponse(null);
} else {
@ -126,13 +125,27 @@ public final class RemoteClusterService extends AbstractComponent implements Clo
String attribute = REMOTE_NODE_ATTRIBUTE.get(settings);
nodePredicate = nodePredicate.and((node) -> Boolean.getBoolean(node.getAttributes().getOrDefault(attribute, "false")));
}
remoteClusters.putAll(this.remoteClusters);
for (Map.Entry<String, List<DiscoveryNode>> entry : seeds.entrySet()) {
RemoteClusterConnection remote = this.remoteClusters.get(entry.getKey());
if (remote == null) {
if (entry.getValue().isEmpty()) { // with no seed nodes we just remove the connection
try {
IOUtils.close(remote);
} catch (IOException e) {
logger.warn("failed to close remote cluster connections for cluster: " + entry.getKey(), e);
}
remoteClusters.remove(entry.getKey());
continue;
}
if (remote == null) { // this is a new cluster we have to add a new representation
remote = new RemoteClusterConnection(settings, entry.getKey(), entry.getValue(), transportService, numRemoteConnections,
nodePredicate);
remoteClusters.put(entry.getKey(), remote);
}
// now update the seed nodes no matter if it's new or already existing
RemoteClusterConnection finalRemote = remote;
remote.updateSeedNodes(entry.getValue(), ActionListener.wrap(
response -> {
if (countDown.countDown()) {
@ -143,15 +156,14 @@ public final class RemoteClusterService extends AbstractComponent implements Clo
if (countDown.fastForward()) {
connectionListener.onFailure(exception);
}
logger.error("failed to update seed list for cluster: " + entry.getKey(), exception);
if (finalRemote.isClosed() == false) {
logger.warn("failed to update seed list for cluster: " + entry.getKey(), exception);
}
}));
}
}
if (remoteClusters.isEmpty() == false) {
remoteClusters.putAll(this.remoteClusters);
this.remoteClusters = Collections.unmodifiableMap(remoteClusters);
}
}
/**
* Returns <code>true</code> if at least one remote cluster is configured
@ -296,13 +308,38 @@ public final class RemoteClusterService extends AbstractComponent implements Clo
return connection.getConnection(node);
}
public void updateRemoteCluster(String clusterAlias, List<InetSocketAddress> addresses) {
updateRemoteClusters(Collections.singletonMap(clusterAlias, addresses.stream().map(address -> {
TransportAddress transportAddress = new TransportAddress(address);
return new DiscoveryNode(clusterAlias + "#" + transportAddress.toString(),
transportAddress,
Version.CURRENT.minimumCompatibilityVersion());
}).collect(Collectors.toList())),
ActionListener.wrap((x) -> {}, (x) -> {}) );
}
static Map<String, List<DiscoveryNode>> buildRemoteClustersSeeds(Settings settings) {
Map<String, List<DiscoveryNode>> remoteClustersNodes = new HashMap<>();
for (String clusterName : settings.names()) {
String[] remoteHosts = settings.getAsArray(clusterName);
for (String remoteHost : remoteHosts) {
Stream<Setting<List<InetSocketAddress>>> allConcreteSettings = REMOTE_CLUSTERS_SEEDS.getAllConcreteSettings(settings);
return allConcreteSettings.collect(
Collectors.toMap(REMOTE_CLUSTERS_SEEDS::getNamespace, concreteSetting -> {
String clusterName = REMOTE_CLUSTERS_SEEDS.getNamespace(concreteSetting);
List<DiscoveryNode> nodes = new ArrayList<>();
for (InetSocketAddress address : concreteSetting.get(settings)) {
TransportAddress transportAddress = new TransportAddress(address);
DiscoveryNode node = new DiscoveryNode(clusterName + "#" + transportAddress.toString(),
transportAddress,
Version.CURRENT.minimumCompatibilityVersion());
nodes.add(node);
}
return nodes;
}));
}
static final InetSocketAddress parseSeedAddress(String remoteHost) {
int portSeparator = remoteHost.lastIndexOf(':'); // in case we have a IPv6 address ie. [::1]:9300
if (portSeparator == -1 || portSeparator == remoteHost.length()) {
throw new IllegalArgumentException("remote hosts need to be configured as [host:port], found [" + remoteHost + "] instead");
}
String host = remoteHost.substring(0, portSeparator);
InetAddress hostAddress;
try {
@ -310,51 +347,16 @@ public final class RemoteClusterService extends AbstractComponent implements Clo
} catch (UnknownHostException e) {
throw new IllegalArgumentException("unknown host [" + host + "]", e);
}
try {
int port = Integer.valueOf(remoteHost.substring(portSeparator + 1));
DiscoveryNode node = new DiscoveryNode(clusterName + "#" + remoteHost,
new TransportAddress(new InetSocketAddress(hostAddress, port)),
Version.CURRENT.minimumCompatibilityVersion());
List<DiscoveryNode> nodes = remoteClustersNodes.get(clusterName);
if (nodes == null) {
nodes = new ArrayList<>();
remoteClustersNodes.put(clusterName, nodes);
if (port <= 0) {
throw new IllegalArgumentException("port number must be > 0 but was: [" + port + "]");
}
nodes.add(node);
}
}
return remoteClustersNodes;
return new InetSocketAddress(hostAddress, port);
} catch (NumberFormatException e) {
throw new IllegalArgumentException("port must be a number");
}
static void validateRemoteClustersSeeds(Settings settings) {
for (String clusterName : settings.names()) {
String[] remoteHosts = settings.getAsArray(clusterName);
if (remoteHosts.length == 0) {
throw new IllegalArgumentException("no hosts set for remote cluster [" + clusterName + "], at least one host is required");
}
for (String remoteHost : remoteHosts) {
int portSeparator = remoteHost.lastIndexOf(':'); // in case we have a IPv6 address ie. [::1]:9300
if (portSeparator == -1 || portSeparator == remoteHost.length()) {
throw new IllegalArgumentException("remote hosts need to be configured as [host:port], found [" + remoteHost + "] " +
"instead for remote cluster [" + clusterName + "]");
}
String host = remoteHost.substring(0, portSeparator);
try {
InetAddress.getByName(host);
} catch (UnknownHostException e) {
throw new IllegalArgumentException("unknown host [" + host + "]", e);
}
String port = remoteHost.substring(portSeparator + 1);
try {
Integer portValue = Integer.valueOf(port);
if (portValue <= 0) {
throw new IllegalArgumentException("port number must be > 0 but was: [" + portValue + "]");
}
} catch (NumberFormatException e) {
throw new IllegalArgumentException("port must be a number, found [" + port + "] instead for remote cluster [" +
clusterName + "]");
}
}
}
}
/**
@ -364,7 +366,8 @@ public final class RemoteClusterService extends AbstractComponent implements Clo
void initializeRemoteClusters() {
final TimeValue timeValue = REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING.get(settings);
final PlainActionFuture<Void> future = new PlainActionFuture<>();
updateRemoteClusters(REMOTE_CLUSTERS_SEEDS.get(settings), future);
Map<String, List<DiscoveryNode>> seeds = buildRemoteClustersSeeds(settings);
updateRemoteClusters(seeds, future);
try {
future.get(timeValue.millis(), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {

View File

@ -83,10 +83,8 @@ public class SearchTransportService extends AbstractLifecycleComponent {
super(settings);
this.transportService = transportService;
this.remoteClusterService = new RemoteClusterService(settings, transportService);
final Consumer<Settings> clusterUpdateConsumer = (s) -> remoteClusterService.updateRemoteClusters(s,
ActionListener.wrap((x) -> {}, (x) -> {}));
clusterSettings.addSettingsUpdateConsumer(RemoteClusterService.REMOTE_CLUSTERS_SEEDS, clusterUpdateConsumer,
RemoteClusterService::validateRemoteClustersSeeds);
clusterSettings.addAffixUpdateConsumer(RemoteClusterService.REMOTE_CLUSTERS_SEEDS, remoteClusterService::updateRemoteCluster,
(namespace, value) -> {});
}
public void sendFreeContext(Transport.Connection connection, final long contextId, SearchRequest request) {

View File

@ -42,6 +42,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
@ -480,7 +481,7 @@ public class Setting<T> extends ToXContentToBytes {
public Map<AbstractScopedSettings.SettingUpdater<T>, T> getValue(Settings current, Settings previous) {
// we collect all concrete keys and then delegate to the actual setting for validation and settings extraction
final Map<AbstractScopedSettings.SettingUpdater<T>, T> result = new IdentityHashMap<>();
Stream.concat(matchStream(current), matchStream(previous)).forEach(aKey -> {
Stream.concat(matchStream(current), matchStream(previous)).distinct().forEach(aKey -> {
String namespace = key.getNamespace(aKey);
AbstractScopedSettings.SettingUpdater<T> updater =
getConcreteSetting(aKey).newUpdater((v) -> consumer.accept(namespace, v), logger,
@ -504,6 +505,18 @@ public class Setting<T> extends ToXContentToBytes {
};
}
@Override
public T get(Settings settings) {
throw new UnsupportedOperationException("affix settings can't return values" +
" use #getConcreteSetting to obtain a concrete setting");
}
@Override
public String getRaw(Settings settings) {
throw new UnsupportedOperationException("affix settings can't return values" +
" use #getConcreteSetting to obtain a concrete setting");
}
@Override
public Setting<T> getConcreteSetting(String key) {
if (match(key)) {
@ -517,6 +530,22 @@ public class Setting<T> extends ToXContentToBytes {
public void diff(Settings.Builder builder, Settings source, Settings defaultSettings) {
matchStream(defaultSettings).forEach((key) -> getConcreteSetting(key).diff(builder, source, defaultSettings));
}
/**
* Returns the namespace for a concrete settting. Ie. an affix setting with prefix: <tt>search.</tt> and suffix: <tt>username</tt>
* will return <tt>remote</tt> as a namespace for the setting <tt>search.remote.username</tt>
*/
public String getNamespace(Setting<T> concreteSetting) {
return key.getNamespace(concreteSetting.getKey());
}
/**
* Returns a stream of all concrete setting instances for the given settings. AffixSetting is only a specification, concrete
* settings depend on an actual set of setting keys.
*/
public Stream<Setting<T>> getAllConcreteSettings(Settings settings) {
return matchStream(settings).distinct().map(this::getConcreteSetting);
}
}

View File

@ -477,8 +477,10 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
@Override
public final NodeChannels openConnection(DiscoveryNode node, ConnectionProfile connectionProfile) throws IOException {
boolean success = false;
NodeChannels nodeChannels = null;
try {
NodeChannels nodeChannels = connectToChannels(node, connectionProfile);
nodeChannels = connectToChannels(node, connectionProfile);
final Channel channel = nodeChannels.getChannels().get(0); // one channel is guaranteed by the connection profile
final TimeValue connectTimeout = connectionProfile.getConnectTimeout() == null ?
defaultConnectionProfile.getConnectTimeout() :
@ -487,13 +489,19 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
connectTimeout : connectionProfile.getHandshakeTimeout();
final Version version = executeHandshake(node, channel, handshakeTimeout);
transportServiceAdapter.onConnectionOpened(node);
return new NodeChannels(nodeChannels, version); // clone the channels - we now have the correct version
nodeChannels = new NodeChannels(nodeChannels, version);// clone the channels - we now have the correct version
success = true;
return nodeChannels;
} catch (ConnectTransportException e) {
throw e;
} catch (Exception e) {
// ConnectTransportExceptions are handled specifically on the caller end - we wrap the actual exception to ensure
// only relevant exceptions are logged on the caller end.. this is the same as in connectToNode
throw new ConnectTransportException(node, "general node connection failure", e);
} finally {
if (success == false) {
IOUtils.closeWhileHandlingException(nodeChannels);
}
}
}

View File

@ -85,20 +85,20 @@ public class RemoteClusterServiceTests extends ESTestCase {
public void testRemoteClusterSeedSetting() {
// simple validation
RemoteClusterService.REMOTE_CLUSTERS_SEEDS.get(Settings.builder()
.put("search.remote.seeds.foo", "192.168.0.1:8080")
.put("search.remote.seeds.bar", "[::1]:9090").build());
Settings settings = Settings.builder()
.put("search.remote.foo.seeds", "192.168.0.1:8080")
.put("search.remote.bar.seed", "[::1]:9090").build();
RemoteClusterService.REMOTE_CLUSTERS_SEEDS.getAllConcreteSettings(settings).forEach(setting -> setting.get(settings));
Settings brokenSettings = Settings.builder()
.put("search.remote.foo.seeds", "192.168.0.1").build();
expectThrows(IllegalArgumentException.class, () ->
RemoteClusterService.REMOTE_CLUSTERS_SEEDS.get(Settings.builder()
.put("search.remote.seeds.foo", "192.168.0.1").build()));
RemoteClusterService.REMOTE_CLUSTERS_SEEDS.getAllConcreteSettings(brokenSettings).forEach(setting -> setting.get(brokenSettings)));
}
public void testBuiltRemoteClustersSeeds() throws Exception {
Map<String, List<DiscoveryNode>> map = RemoteClusterService.buildRemoteClustersSeeds(
RemoteClusterService.REMOTE_CLUSTERS_SEEDS.get(Settings.builder()
.put("search.remote.seeds.foo", "192.168.0.1:8080")
.put("search.remote.seeds.bar", "[::1]:9090").build()));
Settings.builder().put("search.remote.foo.seeds", "192.168.0.1:8080").put("search.remote.bar.seeds", "[::1]:9090").build());
assertEquals(2, map.size());
assertTrue(map.containsKey("foo"));
assertTrue(map.containsKey("bar"));
@ -133,8 +133,8 @@ public class RemoteClusterServiceTests extends ESTestCase {
transportService.start();
transportService.acceptIncomingRequests();
Settings.Builder builder = Settings.builder();
builder.putArray("search.remote.seeds.cluster_1", seedNode.getAddress().toString());
builder.putArray("search.remote.seeds.cluster_2", otherSeedNode.getAddress().toString());
builder.putArray("search.remote.cluster_1.seeds", seedNode.getAddress().toString());
builder.putArray("search.remote.cluster_2.seeds", otherSeedNode.getAddress().toString());
try (RemoteClusterService service = new RemoteClusterService(builder.build(), transportService)) {
assertFalse(service.isCrossClusterSearchEnabled());
service.initializeRemoteClusters();
@ -154,6 +154,41 @@ public class RemoteClusterServiceTests extends ESTestCase {
}
}
public void testIncrementallyAddClusters() throws IOException {
List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
try (MockTransportService seedTransport = startTransport("cluster_1_node", knownNodes, Version.CURRENT);
MockTransportService otherSeedTransport = startTransport("cluster_2_node", knownNodes, Version.CURRENT)) {
DiscoveryNode seedNode = seedTransport.getLocalDiscoNode();
DiscoveryNode otherSeedNode = otherSeedTransport.getLocalDiscoNode();
knownNodes.add(seedTransport.getLocalDiscoNode());
knownNodes.add(otherSeedTransport.getLocalDiscoNode());
Collections.shuffle(knownNodes, random());
try (MockTransportService transportService = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool,
null)) {
transportService.start();
transportService.acceptIncomingRequests();
Settings.Builder builder = Settings.builder();
builder.putArray("search.remote.cluster_1.seeds", seedNode.getAddress().toString());
builder.putArray("search.remote.cluster_2.seeds", otherSeedNode.getAddress().toString());
try (RemoteClusterService service = new RemoteClusterService(Settings.EMPTY, transportService)) {
assertFalse(service.isCrossClusterSearchEnabled());
service.initializeRemoteClusters();
assertFalse(service.isCrossClusterSearchEnabled());
service.updateRemoteCluster("cluster_1", Collections.singletonList(seedNode.getAddress().address()));
assertTrue(service.isCrossClusterSearchEnabled());
assertTrue(service.isRemoteClusterRegistered("cluster_1"));
service.updateRemoteCluster("cluster_2", Collections.singletonList(otherSeedNode.getAddress().address()));
assertTrue(service.isCrossClusterSearchEnabled());
assertTrue(service.isRemoteClusterRegistered("cluster_1"));
assertTrue(service.isRemoteClusterRegistered("cluster_2"));
service.updateRemoteCluster("cluster_2", Collections.emptyList());
assertFalse(service.isRemoteClusterRegistered("cluster_2"));
}
}
}
}
public void testProcessRemoteShards() throws IOException {
try (RemoteClusterService service = new RemoteClusterService(Settings.EMPTY, null)) {
assertFalse(service.isCrossClusterSearchEnabled());

View File

@ -31,6 +31,8 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
@ -475,6 +477,33 @@ public class SettingTests extends ESTestCase {
assertFalse(listAffixSetting.match("foo"));
}
public void testGetAllConcreteSettings() {
Setting.AffixSetting<List<String>> listAffixSetting = Setting.affixKeySetting("foo.", "bar",
(key) -> Setting.listSetting(key, Collections.emptyList(), Function.identity(), Property.NodeScope));
Settings settings = Settings.builder()
.putArray("foo.1.bar", "1", "2")
.putArray("foo.2.bar", "3", "4", "5")
.putArray("foo.bar", "6")
.putArray("some.other", "6")
.putArray("foo.3.bar", "6")
.build();
Stream<Setting<List<String>>> allConcreteSettings = listAffixSetting.getAllConcreteSettings(settings);
Map<String, List<String>> collect = allConcreteSettings.collect(Collectors.toMap(Setting::getKey, (s) -> s.get(settings)));
assertEquals(3, collect.size());
assertEquals(Arrays.asList("1", "2"), collect.get("foo.1.bar"));
assertEquals(Arrays.asList("3", "4", "5"), collect.get("foo.2.bar"));
assertEquals(Arrays.asList("6"), collect.get("foo.3.bar"));
}
public void testAffixSettingsFailOnGet() {
Setting.AffixSetting<List<String>> listAffixSetting = Setting.affixKeySetting("foo.", "bar",
(key) -> Setting.listSetting(key, Collections.singletonList("testelement"), Function.identity(), Property.NodeScope));
expectThrows(UnsupportedOperationException.class, () -> listAffixSetting.get(Settings.EMPTY));
expectThrows(UnsupportedOperationException.class, () -> listAffixSetting.getRaw(Settings.EMPTY));
assertEquals(Collections.singletonList("testelement"), listAffixSetting.getDefault(Settings.EMPTY));
assertEquals("[\"testelement\"]", listAffixSetting.getDefaultRaw(Settings.EMPTY));
}
public void testMinMaxInt() {
Setting<Integer> integerSetting = Setting.intSetting("foo.bar", 1, 0, 10, Property.NodeScope);

View File

@ -24,9 +24,10 @@ remote clusters that should be connected to, for instance:
--------------------------------
search:
remote:
seeds:
cluster_one: 127.0.0.1:9300 <1>
cluster_two: 127.0.0.1:9301 <1>
cluster_one: <1>
seeds: 127.0.0.1:9300
cluster_two: <1>
seeds: 127.0.0.1:9301
--------------------------------
<1> `cluster_one` and `cluster_two` are arbitrary names representing the connection to each cluster. These names are subsequently used to distinguish between local and remote indices.

View File

@ -35,7 +35,7 @@ task mixedClusterTest(type: RestIntegTestTask) {
dependsOn(remoteClusterTest)
cluster {
distribution = 'zip'
setting 'search.remote.seeds.my_remote_cluster', "\"${-> remoteClusterTest.nodes.get(0).transportUri()}\""
setting 'search.remote.my_remote_cluster.seeds', "\"${-> remoteClusterTest.nodes.get(0).transportUri()}\""
setting 'search.remote.connections_per_cluster', 1
}

View File

@ -99,16 +99,16 @@
cluster.get_settings:
include_defaults: true
- set: { defaults.search.remote.seeds.my_remote_cluster: remote_ip }
- set: { defaults.search.remote.my_remote_cluster.seeds.0: remote_ip }
- do:
cluster.put_settings:
flat_settings: true
body:
transient:
search.remote.seeds.test_remote_cluster: $remote_ip
search.remote.test_remote_cluster.seeds: $remote_ip
- match: {transient: {search.remote.seeds.test_remote_cluster: $remote_ip}}
- match: {transient: {search.remote.test_remote_cluster.seeds: $remote_ip}}
- do:
search: