Extract a common base class to allow services to listen to remote cluster config updates (#24367)

RemoteClusterService is an internal service that should not necessarily be exposed
to plugins or other parts of the system. Yet, for cluster name parsing for instance
it is crucial to reuse some code that is used for the RemoteClusterService. This
change extracts a base class that allows to share the settings related code as well
as cluster settings updates to `search.remote.*` to be observed by other services.
This commit is contained in:
Simon Willnauer 2017-05-02 16:02:36 +02:00 committed by GitHub
parent 1435c23df2
commit 691ec68a3c
7 changed files with 181 additions and 114 deletions

View File

@ -0,0 +1,162 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.search;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.metadata.ClusterNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* Base class for all services and components that need up-to-date information about the registered remote clusters
*/
public abstract class RemoteClusterAware extends AbstractComponent {
/**
* A list of initial seed nodes to discover eligible nodes from the remote cluster
*/
public static final Setting.AffixSetting<List<InetSocketAddress>> REMOTE_CLUSTERS_SEEDS = Setting.affixKeySetting("search.remote.",
"seeds", (key) -> Setting.listSetting(key, Collections.emptyList(), RemoteClusterAware::parseSeedAddress,
Setting.Property.NodeScope, Setting.Property.Dynamic));
protected static final char REMOTE_CLUSTER_INDEX_SEPARATOR = ':';
protected static final String LOCAL_CLUSTER_GROUP_KEY = "";
protected final ClusterNameExpressionResolver clusterNameResolver;
/**
* Creates a new {@link RemoteClusterAware} instance
* @param settings the nodes level settings
*/
protected RemoteClusterAware(Settings settings) {
super(settings);
this.clusterNameResolver = new ClusterNameExpressionResolver(settings);
}
protected static Map<String, List<DiscoveryNode>> buildRemoteClustersSeeds(Settings settings) {
Stream<Setting<List<InetSocketAddress>>> allConcreteSettings = REMOTE_CLUSTERS_SEEDS.getAllConcreteSettings(settings);
return allConcreteSettings.collect(
Collectors.toMap(REMOTE_CLUSTERS_SEEDS::getNamespace, concreteSetting -> {
String clusterName = REMOTE_CLUSTERS_SEEDS.getNamespace(concreteSetting);
List<DiscoveryNode> nodes = new ArrayList<>();
for (InetSocketAddress address : concreteSetting.get(settings)) {
TransportAddress transportAddress = new TransportAddress(address);
DiscoveryNode node = new DiscoveryNode(clusterName + "#" + transportAddress.toString(),
transportAddress,
Version.CURRENT.minimumCompatibilityVersion());
nodes.add(node);
}
return nodes;
}));
}
/**
* Groups indices per cluster by splitting remote cluster-alias, index-name pairs on {@link #REMOTE_CLUSTER_INDEX_SEPARATOR}. All
* indices per cluster are collected as a list in the returned map keyed by the cluster alias. Local indices are grouped under
* {@link #LOCAL_CLUSTER_GROUP_KEY}. The returned map is mutable.
*
* @param requestIndices the indices in the search request to filter
* @param indexExists a predicate that can test if a certain index or alias exists in the local cluster
*
* @return a map of grouped remote and local indices
*/
protected Map<String, List<String>> groupClusterIndices(String[] requestIndices, Predicate<String> indexExists) {
Map<String, List<String>> perClusterIndices = new HashMap<>();
Set<String> remoteClusterNames = getRemoteClusterNames();
for (String index : requestIndices) {
int i = index.indexOf(RemoteClusterService.REMOTE_CLUSTER_INDEX_SEPARATOR);
if (i >= 0) {
String remoteClusterName = index.substring(0, i);
List<String> clusters = clusterNameResolver.resolveClusterNames(remoteClusterNames, remoteClusterName);
if (clusters.isEmpty() == false) {
if (indexExists.test(index)) {
// we use : as a separator for remote clusters. might conflict if there is an index that is actually named
// remote_cluster_alias:index_name - for this case we fail the request. the user can easily change the cluster alias
// if that happens
throw new IllegalArgumentException("Can not filter indices; index " + index +
" exists but there is also a remote cluster named: " + remoteClusterName);
}
String indexName = index.substring(i + 1);
for (String clusterName : clusters) {
perClusterIndices.computeIfAbsent(clusterName, k -> new ArrayList<>()).add(indexName);
}
} else {
perClusterIndices.computeIfAbsent(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, k -> new ArrayList<>()).add(index);
}
} else {
perClusterIndices.computeIfAbsent(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, k -> new ArrayList<>()).add(index);
}
}
return perClusterIndices;
}
protected abstract Set<String> getRemoteClusterNames();
/**
* Subclasses must implement this to receive information about updated cluster aliases. If the given address list is
* empty the cluster alias is unregistered and should be removed.
*/
protected abstract void updateRemoteCluster(String clusterAlias, List<InetSocketAddress> addresses);
/**
* Registers this instance to listen to updates on the cluster settings.
*/
public void listenForUpdates(ClusterSettings clusterSettings) {
clusterSettings.addAffixUpdateConsumer(RemoteClusterAware.REMOTE_CLUSTERS_SEEDS, this::updateRemoteCluster,
(namespace, value) -> {});
}
private static InetSocketAddress parseSeedAddress(String remoteHost) {
int portSeparator = remoteHost.lastIndexOf(':'); // in case we have a IPv6 address ie. [::1]:9300
if (portSeparator == -1 || portSeparator == remoteHost.length()) {
throw new IllegalArgumentException("remote hosts need to be configured as [host:port], found [" + remoteHost + "] instead");
}
String host = remoteHost.substring(0, portSeparator);
InetAddress hostAddress;
try {
hostAddress = InetAddress.getByName(host);
} catch (UnknownHostException e) {
throw new IllegalArgumentException("unknown host [" + host + "]", e);
}
try {
int port = Integer.valueOf(remoteHost.substring(portSeparator + 1));
if (port <= 0) {
throw new IllegalArgumentException("port number must be > 0 but was: [" + port + "]");
}
return new InetSocketAddress(hostAddress, port);
} catch (NumberFormatException e) {
throw new IllegalArgumentException("port must be a number", e);
}
}
}

View File

@ -27,11 +27,9 @@ import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsGroup;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse;
import org.elasticsearch.action.support.GroupedActionListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.metadata.ClusterNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
@ -46,10 +44,7 @@ import org.elasticsearch.transport.TransportService;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
@ -64,21 +59,12 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* Basic service for accessing remote clusters via gateway nodes
*/
public final class RemoteClusterService extends AbstractComponent implements Closeable {
public final class RemoteClusterService extends RemoteClusterAware implements Closeable {
static final String LOCAL_CLUSTER_GROUP_KEY = "";
/**
* A list of initial seed nodes to discover eligible nodes from the remote cluster
*/
public static final Setting.AffixSetting<List<InetSocketAddress>> REMOTE_CLUSTERS_SEEDS = Setting.affixKeySetting("search.remote.",
"seeds", (key) -> Setting.listSetting(key, Collections.emptyList(), RemoteClusterService::parseSeedAddress,
Setting.Property.NodeScope, Setting.Property.Dynamic));
/**
* The maximum number of connections that will be established to a remote cluster. For instance if there is only a single
* seed node, other nodes will be discovered up to the given number of nodes in this setting. The default is 3.
@ -109,17 +95,13 @@ public final class RemoteClusterService extends AbstractComponent implements Clo
public static final Setting<Boolean> ENABLE_REMOTE_CLUSTERS = Setting.boolSetting("search.remote.connect", true,
Setting.Property.NodeScope);
private static final char REMOTE_CLUSTER_INDEX_SEPARATOR = ':';
private final TransportService transportService;
private final int numRemoteConnections;
private final ClusterNameExpressionResolver clusterNameResolver;
private volatile Map<String, RemoteClusterConnection> remoteClusters = Collections.emptyMap();
RemoteClusterService(Settings settings, TransportService transportService) {
super(settings);
this.transportService = transportService;
this.clusterNameResolver = new ClusterNameExpressionResolver(settings);
numRemoteConnections = REMOTE_CONNECTIONS_PER_CLUSTER.get(settings);
}
@ -195,46 +177,6 @@ public final class RemoteClusterService extends AbstractComponent implements Clo
return remoteClusters.get(remoteCluster).isNodeConnected(node);
}
/**
* Groups indices per cluster by splitting remote cluster-alias, index-name pairs on {@link #REMOTE_CLUSTER_INDEX_SEPARATOR}. All
* indices per cluster are collected as a list in the returned map keyed by the cluster alias. Local indices are grouped under
* {@link #LOCAL_CLUSTER_GROUP_KEY}. The returned map is mutable.
*
* @param requestIndices the indices in the search request to filter
* @param indexExists a predicate that can test if a certain index or alias exists
*
* @return a map of grouped remote and local indices
*/
Map<String, List<String>> groupClusterIndices(String[] requestIndices, Predicate<String> indexExists) {
Map<String, List<String>> perClusterIndices = new HashMap<>();
Set<String> remoteClusterNames = this.remoteClusters.keySet();
for (String index : requestIndices) {
int i = index.indexOf(REMOTE_CLUSTER_INDEX_SEPARATOR);
if (i >= 0) {
String remoteClusterName = index.substring(0, i);
List<String> clusters = clusterNameResolver.resolveClusterNames(remoteClusterNames, remoteClusterName);
if (clusters.isEmpty() == false) {
if (indexExists.test(index)) {
// we use : as a separator for remote clusters. might conflict if there is an index that is actually named
// remote_cluster_alias:index_name - for this case we fail the request. the user can easily change the cluster alias
// if that happens
throw new IllegalArgumentException("Can not filter indices; index " + index +
" exists but there is also a remote cluster named: " + remoteClusterName);
}
String indexName = index.substring(i + 1);
for (String clusterName : clusters) {
perClusterIndices.computeIfAbsent(clusterName, k -> new ArrayList<>()).add(indexName);
}
} else {
perClusterIndices.computeIfAbsent(LOCAL_CLUSTER_GROUP_KEY, k -> new ArrayList<>()).add(index);
}
} else {
perClusterIndices.computeIfAbsent(LOCAL_CLUSTER_GROUP_KEY, k -> new ArrayList<>()).add(index);
}
}
return perClusterIndices;
}
/**
* Returns <code>true</code> iff the given cluster is configured as a remote cluster. Otherwise <code>false</code>
*/
@ -342,7 +284,12 @@ public final class RemoteClusterService extends AbstractComponent implements Clo
return connection.getConnection(node);
}
void updateRemoteCluster(String clusterAlias, List<InetSocketAddress> addresses) {
@Override
protected Set<String> getRemoteClusterNames() {
return this.remoteClusters.keySet();
}
protected void updateRemoteCluster(String clusterAlias, List<InetSocketAddress> addresses) {
updateRemoteCluster(clusterAlias, addresses, ActionListener.wrap((x) -> {}, (x) -> {}));
}
@ -359,47 +306,6 @@ public final class RemoteClusterService extends AbstractComponent implements Clo
updateRemoteClusters(Collections.singletonMap(clusterAlias, nodes), connectionListener);
}
static Map<String, List<DiscoveryNode>> buildRemoteClustersSeeds(Settings settings) {
Stream<Setting<List<InetSocketAddress>>> allConcreteSettings = REMOTE_CLUSTERS_SEEDS.getAllConcreteSettings(settings);
return allConcreteSettings.collect(
Collectors.toMap(REMOTE_CLUSTERS_SEEDS::getNamespace, concreteSetting -> {
String clusterName = REMOTE_CLUSTERS_SEEDS.getNamespace(concreteSetting);
List<DiscoveryNode> nodes = new ArrayList<>();
for (InetSocketAddress address : concreteSetting.get(settings)) {
TransportAddress transportAddress = new TransportAddress(address);
DiscoveryNode node = new DiscoveryNode(clusterName + "#" + transportAddress.toString(),
transportAddress,
Version.CURRENT.minimumCompatibilityVersion());
nodes.add(node);
}
return nodes;
}));
}
private static InetSocketAddress parseSeedAddress(String remoteHost) {
int portSeparator = remoteHost.lastIndexOf(':'); // in case we have a IPv6 address ie. [::1]:9300
if (portSeparator == -1 || portSeparator == remoteHost.length()) {
throw new IllegalArgumentException("remote hosts need to be configured as [host:port], found [" + remoteHost + "] instead");
}
String host = remoteHost.substring(0, portSeparator);
InetAddress hostAddress;
try {
hostAddress = InetAddress.getByName(host);
} catch (UnknownHostException e) {
throw new IllegalArgumentException("unknown host [" + host + "]", e);
}
try {
int port = Integer.valueOf(remoteHost.substring(portSeparator + 1));
if (port <= 0) {
throw new IllegalArgumentException("port number must be > 0 but was: [" + port + "]");
}
return new InetSocketAddress(hostAddress, port);
} catch (NumberFormatException e) {
throw new IllegalArgumentException("port must be a number", e);
}
}
/**
* Connects to all remote clusters in a blocking fashion. This should be called on node startup to establish an initial connection
* to all configured seed nodes.
@ -407,7 +313,7 @@ public final class RemoteClusterService extends AbstractComponent implements Clo
void initializeRemoteClusters() {
final TimeValue timeValue = REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING.get(settings);
final PlainActionFuture<Void> future = new PlainActionFuture<>();
Map<String, List<DiscoveryNode>> seeds = buildRemoteClustersSeeds(settings);
Map<String, List<DiscoveryNode>> seeds = RemoteClusterAware.buildRemoteClustersSeeds(settings);
updateRemoteClusters(seeds, future);
try {
future.get(timeValue.millis(), TimeUnit.MILLISECONDS);

View File

@ -86,9 +86,7 @@ public class SearchTransportService extends AbstractLifecycleComponent {
this.transportService = transportService;
this.remoteClusterService = new RemoteClusterService(settings, transportService);
if (connectToRemote) {
clusterSettings.addAffixUpdateConsumer(RemoteClusterService.REMOTE_CLUSTERS_SEEDS, remoteClusterService::updateRemoteCluster,
(namespace, value) -> {
});
remoteClusterService.listenForUpdates(clusterSettings);
}
}

View File

@ -179,7 +179,7 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
final Map<String, List<String>> groupedIndices = remoteClusterService.groupClusterIndices(searchRequest.indices(),
// empty string is not allowed
idx -> indexNameExpressionResolver.hasIndexOrAlias(idx, clusterState));
List<String> remove = groupedIndices.remove(RemoteClusterService.LOCAL_CLUSTER_GROUP_KEY);
List<String> remove = groupedIndices.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY);
String[] indices = remove == null ? Strings.EMPTY_ARRAY : remove.toArray(new String[remove.size()]);
localIndices = new OriginalIndices(indices, searchRequest.indicesOptions());
Map<String, OriginalIndices> originalIndicesMap = new HashMap<>();

View File

@ -19,6 +19,7 @@
package org.elasticsearch.common.settings;
import org.elasticsearch.action.admin.indices.close.TransportCloseIndexAction;
import org.elasticsearch.action.search.RemoteClusterAware;
import org.elasticsearch.action.search.RemoteClusterService;
import org.elasticsearch.action.search.TransportSearchAction;
import org.elasticsearch.action.support.AutoCreateIndex;
@ -254,7 +255,7 @@ public final class ClusterSettings extends AbstractScopedSettings {
SearchService.DEFAULT_SEARCH_TIMEOUT_SETTING,
ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING,
TransportSearchAction.SHARD_COUNT_LIMIT_SETTING,
RemoteClusterService.REMOTE_CLUSTERS_SEEDS,
RemoteClusterAware.REMOTE_CLUSTERS_SEEDS,
RemoteClusterService.REMOTE_CONNECTIONS_PER_CLUSTER,
RemoteClusterService.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING,
RemoteClusterService.REMOTE_NODE_ATTRIBUTE,

View File

@ -78,7 +78,7 @@ public class RemoteClusterServiceTests extends ESTestCase {
}
public void testSettingsAreRegistered() {
assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(RemoteClusterService.REMOTE_CLUSTERS_SEEDS));
assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(RemoteClusterAware.REMOTE_CLUSTERS_SEEDS));
assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(RemoteClusterService.REMOTE_CONNECTIONS_PER_CLUSTER));
assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(RemoteClusterService.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING));
assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(RemoteClusterService.REMOTE_NODE_ATTRIBUTE));
@ -89,12 +89,12 @@ public class RemoteClusterServiceTests extends ESTestCase {
Settings settings = Settings.builder()
.put("search.remote.foo.seeds", "192.168.0.1:8080")
.put("search.remote.bar.seed", "[::1]:9090").build();
RemoteClusterService.REMOTE_CLUSTERS_SEEDS.getAllConcreteSettings(settings).forEach(setting -> setting.get(settings));
RemoteClusterAware.REMOTE_CLUSTERS_SEEDS.getAllConcreteSettings(settings).forEach(setting -> setting.get(settings));
Settings brokenSettings = Settings.builder()
.put("search.remote.foo.seeds", "192.168.0.1").build();
expectThrows(IllegalArgumentException.class, () ->
RemoteClusterService.REMOTE_CLUSTERS_SEEDS.getAllConcreteSettings(brokenSettings).forEach(setting -> setting.get(brokenSettings)));
RemoteClusterAware.REMOTE_CLUSTERS_SEEDS.getAllConcreteSettings(brokenSettings).forEach(setting -> setting.get(brokenSettings)));
}
public void testBuiltRemoteClustersSeeds() throws Exception {
@ -145,9 +145,9 @@ public class RemoteClusterServiceTests extends ESTestCase {
assertFalse(service.isRemoteClusterRegistered("foo"));
Map<String, List<String>> perClusterIndices = service.groupClusterIndices(new String[]{"foo:bar", "cluster_1:bar",
"cluster_2:foo:bar", "cluster_1:test", "cluster_2:foo*", "foo", "cluster*:baz", "*:boo", "no*match:boo"}, i -> false);
String[] localIndices = perClusterIndices.computeIfAbsent(RemoteClusterService.LOCAL_CLUSTER_GROUP_KEY,
String[] localIndices = perClusterIndices.computeIfAbsent(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY,
k -> Collections.emptyList()).toArray(new String[0]);
assertNotNull(perClusterIndices.remove(RemoteClusterService.LOCAL_CLUSTER_GROUP_KEY));
assertNotNull(perClusterIndices.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY));
assertArrayEquals(new String[]{"foo:bar", "foo", "no*match:boo"}, localIndices);
assertEquals(2, perClusterIndices.size());
assertEquals(Arrays.asList("bar", "test", "baz", "boo"), perClusterIndices.get("cluster_1"));
@ -195,7 +195,7 @@ public class RemoteClusterServiceTests extends ESTestCase {
service.updateRemoteCluster("cluster_2", Collections.emptyList());
assertFalse(service.isRemoteClusterRegistered("cluster_2"));
IllegalArgumentException iae = expectThrows(IllegalArgumentException.class,
() -> service.updateRemoteCluster(RemoteClusterService.LOCAL_CLUSTER_GROUP_KEY, Collections.emptyList()));
() -> service.updateRemoteCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, Collections.emptyList()));
assertEquals("remote clusters must not have the empty string as its key", iae.getMessage());
}
}

View File

@ -81,7 +81,7 @@ public class SearchAsyncActionTests extends ESTestCase {
randomIntBetween(1, 10), randomBoolean(), primaryNode, replicaNode);
AtomicInteger numFreedContext = new AtomicInteger();
SearchTransportService transportService = new SearchTransportService(Settings.EMPTY, new ClusterSettings(Settings.EMPTY,
Collections.singleton(RemoteClusterService.REMOTE_CLUSTERS_SEEDS)), null) {
Collections.singleton(RemoteClusterAware.REMOTE_CLUSTERS_SEEDS)), null) {
@Override
public void sendFreeContext(Transport.Connection connection, long contextId, OriginalIndices originalIndices) {
numFreedContext.incrementAndGet();