Fix remote cluster seeds fallback (#34090)
Recently we introduced the settings cluster.remote to take the place of search.remote for configuring remote cluster connections. We made this change due to the fact that we have generalized the remote cluster infrastructure to also be used within cross-cluster replication and not only cross-cluster search. For backwards compatibility, when we made this change, we allowed that cluster.remote would fallback to search.remote. Alas, the initial change for this contained a bug for handling the proxy and seeds settings. The bug for the seeds settings arose because we were manually iterating over the concrete settings only for cluster.remote seeds but not for search.remote seeds. This commit addresses this by iterating over both cluster.remote seeds and search.remote seeds. Additionally, when checking for existence of proxy settings, we have to not only check cluster.remote proxy settings, but also fallback to search.remote proxy settings. This commit addresses both issues, and adds tests for these situations.
This commit is contained in:
parent
609ccaad07
commit
899a7c7d99
|
@ -36,12 +36,16 @@ import java.net.InetAddress;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.net.UnknownHostException;
|
import java.net.UnknownHostException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.EnumSet;
|
import java.util.EnumSet;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Locale;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.NavigableSet;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.TreeSet;
|
||||||
import java.util.function.Predicate;
|
import java.util.function.Predicate;
|
||||||
import java.util.function.Supplier;
|
import java.util.function.Supplier;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
@ -181,12 +185,36 @@ public abstract class RemoteClusterAware extends AbstractComponent {
|
||||||
* {@link TransportAddress#META_ADDRESS} and their configured address will be used as the hostname for the generated discovery node.
|
* {@link TransportAddress#META_ADDRESS} and their configured address will be used as the hostname for the generated discovery node.
|
||||||
*/
|
*/
|
||||||
protected static Map<String, Tuple<String, List<Supplier<DiscoveryNode>>>> buildRemoteClustersDynamicConfig(Settings settings) {
|
protected static Map<String, Tuple<String, List<Supplier<DiscoveryNode>>>> buildRemoteClustersDynamicConfig(Settings settings) {
|
||||||
Stream<Setting<List<String>>> allConcreteSettings = REMOTE_CLUSTERS_SEEDS.getAllConcreteSettings(settings);
|
final Map<String, Tuple<String, List<Supplier<DiscoveryNode>>>> remoteSeeds =
|
||||||
|
buildRemoteClustersDynamicConfig(settings, REMOTE_CLUSTERS_SEEDS);
|
||||||
|
final Map<String, Tuple<String, List<Supplier<DiscoveryNode>>>> searchRemoteSeeds =
|
||||||
|
buildRemoteClustersDynamicConfig(settings, SEARCH_REMOTE_CLUSTERS_SEEDS);
|
||||||
|
// sort the intersection for predictable output order
|
||||||
|
final NavigableSet<String> intersection =
|
||||||
|
new TreeSet<>(Arrays.asList(
|
||||||
|
searchRemoteSeeds.keySet().stream().filter(s -> remoteSeeds.keySet().contains(s)).sorted().toArray(String[]::new)));
|
||||||
|
if (intersection.isEmpty() == false) {
|
||||||
|
final String message = String.format(
|
||||||
|
Locale.ROOT,
|
||||||
|
"found duplicate remote cluster configurations for cluster alias%s [%s]",
|
||||||
|
intersection.size() == 1 ? "" : "es",
|
||||||
|
String.join(",", intersection));
|
||||||
|
throw new IllegalArgumentException(message);
|
||||||
|
}
|
||||||
|
return Stream
|
||||||
|
.concat(remoteSeeds.entrySet().stream(), searchRemoteSeeds.entrySet().stream())
|
||||||
|
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
|
||||||
|
}
|
||||||
|
|
||||||
|
private static Map<String, Tuple<String, List<Supplier<DiscoveryNode>>>> buildRemoteClustersDynamicConfig(
|
||||||
|
final Settings settings, final Setting.AffixSetting<List<String>> seedsSetting) {
|
||||||
|
final Stream<Setting<List<String>>> allConcreteSettings = seedsSetting.getAllConcreteSettings(settings);
|
||||||
return allConcreteSettings.collect(
|
return allConcreteSettings.collect(
|
||||||
Collectors.toMap(REMOTE_CLUSTERS_SEEDS::getNamespace, concreteSetting -> {
|
Collectors.toMap(seedsSetting::getNamespace, concreteSetting -> {
|
||||||
String clusterName = REMOTE_CLUSTERS_SEEDS.getNamespace(concreteSetting);
|
String clusterName = seedsSetting.getNamespace(concreteSetting);
|
||||||
List<String> addresses = concreteSetting.get(settings);
|
List<String> addresses = concreteSetting.get(settings);
|
||||||
final boolean proxyMode = REMOTE_CLUSTERS_PROXY.getConcreteSettingForNamespace(clusterName).exists(settings);
|
final boolean proxyMode =
|
||||||
|
REMOTE_CLUSTERS_PROXY.getConcreteSettingForNamespace(clusterName).existsOrFallbackExists(settings);
|
||||||
List<Supplier<DiscoveryNode>> nodes = new ArrayList<>(addresses.size());
|
List<Supplier<DiscoveryNode>> nodes = new ArrayList<>(addresses.size());
|
||||||
for (String address : addresses) {
|
for (String address : addresses) {
|
||||||
nodes.add(() -> buildSeedNode(clusterName, address, proxyMode));
|
nodes.add(() -> buildSeedNode(clusterName, address, proxyMode));
|
||||||
|
|
|
@ -62,7 +62,10 @@ import java.util.stream.Collectors;
|
||||||
import static org.hamcrest.CoreMatchers.containsString;
|
import static org.hamcrest.CoreMatchers.containsString;
|
||||||
import static org.hamcrest.CoreMatchers.instanceOf;
|
import static org.hamcrest.CoreMatchers.instanceOf;
|
||||||
import static org.hamcrest.Matchers.anyOf;
|
import static org.hamcrest.Matchers.anyOf;
|
||||||
|
import static org.hamcrest.Matchers.containsInAnyOrder;
|
||||||
import static org.hamcrest.Matchers.equalTo;
|
import static org.hamcrest.Matchers.equalTo;
|
||||||
|
import static org.hamcrest.Matchers.hasSize;
|
||||||
|
import static org.hamcrest.Matchers.hasToString;
|
||||||
import static org.hamcrest.Matchers.startsWith;
|
import static org.hamcrest.Matchers.startsWith;
|
||||||
|
|
||||||
public class RemoteClusterServiceTests extends ESTestCase {
|
public class RemoteClusterServiceTests extends ESTestCase {
|
||||||
|
@ -120,17 +123,19 @@ public class RemoteClusterServiceTests extends ESTestCase {
|
||||||
|
|
||||||
public void testBuildRemoteClustersDynamicConfig() throws Exception {
|
public void testBuildRemoteClustersDynamicConfig() throws Exception {
|
||||||
Map<String, Tuple<String, List<Supplier<DiscoveryNode>>>> map = RemoteClusterService.buildRemoteClustersDynamicConfig(
|
Map<String, Tuple<String, List<Supplier<DiscoveryNode>>>> map = RemoteClusterService.buildRemoteClustersDynamicConfig(
|
||||||
Settings.builder().put("cluster.remote.foo.seeds", "192.168.0.1:8080")
|
Settings.builder()
|
||||||
|
.put("cluster.remote.foo.seeds", "192.168.0.1:8080")
|
||||||
.put("cluster.remote.bar.seeds", "[::1]:9090")
|
.put("cluster.remote.bar.seeds", "[::1]:9090")
|
||||||
.put("cluster.remote.boom.seeds", "boom-node1.internal:1000")
|
.put("cluster.remote.boom.seeds", "boom-node1.internal:1000")
|
||||||
.put("cluster.remote.boom.proxy", "foo.bar.com:1234").build());
|
.put("cluster.remote.boom.proxy", "foo.bar.com:1234")
|
||||||
assertEquals(3, map.size());
|
.put("search.remote.quux.seeds", "quux:9300")
|
||||||
assertTrue(map.containsKey("foo"));
|
.put("search.remote.quux.proxy", "quux-proxy:19300")
|
||||||
assertTrue(map.containsKey("bar"));
|
.build());
|
||||||
assertTrue(map.containsKey("boom"));
|
assertThat(map.keySet(), containsInAnyOrder(equalTo("foo"), equalTo("bar"), equalTo("boom"), equalTo("quux")));
|
||||||
assertEquals(1, map.get("foo").v2().size());
|
assertThat(map.get("foo").v2(), hasSize(1));
|
||||||
assertEquals(1, map.get("bar").v2().size());
|
assertThat(map.get("bar").v2(), hasSize(1));
|
||||||
assertEquals(1, map.get("boom").v2().size());
|
assertThat(map.get("boom").v2(), hasSize(1));
|
||||||
|
assertThat(map.get("quux").v2(), hasSize(1));
|
||||||
|
|
||||||
DiscoveryNode foo = map.get("foo").v2().get(0).get();
|
DiscoveryNode foo = map.get("foo").v2().get(0).get();
|
||||||
assertEquals("", map.get("foo").v1());
|
assertEquals("", map.get("foo").v1());
|
||||||
|
@ -150,6 +155,41 @@ public class RemoteClusterServiceTests extends ESTestCase {
|
||||||
assertEquals(boom.getId(), "boom#boom-node1.internal:1000");
|
assertEquals(boom.getId(), "boom#boom-node1.internal:1000");
|
||||||
assertEquals("foo.bar.com:1234", map.get("boom").v1());
|
assertEquals("foo.bar.com:1234", map.get("boom").v1());
|
||||||
assertEquals(boom.getVersion(), Version.CURRENT.minimumCompatibilityVersion());
|
assertEquals(boom.getVersion(), Version.CURRENT.minimumCompatibilityVersion());
|
||||||
|
|
||||||
|
DiscoveryNode quux = map.get("quux").v2().get(0).get();
|
||||||
|
assertEquals(quux.getAddress(), new TransportAddress(TransportAddress.META_ADDRESS, 0));
|
||||||
|
assertEquals("quux", quux.getHostName());
|
||||||
|
assertEquals(quux.getId(), "quux#quux:9300");
|
||||||
|
assertEquals("quux-proxy:19300", map.get("quux").v1());
|
||||||
|
assertEquals(quux.getVersion(), Version.CURRENT.minimumCompatibilityVersion());
|
||||||
|
|
||||||
|
assertSettingDeprecationsAndWarnings(new String[]{"search.remote.quux.seeds", "search.remote.quux.proxy"});
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testBuildRemoteClustersDynamicConfigWithDuplicate() {
|
||||||
|
final IllegalArgumentException e = expectThrows(
|
||||||
|
IllegalArgumentException.class,
|
||||||
|
() -> RemoteClusterService.buildRemoteClustersDynamicConfig(
|
||||||
|
Settings.builder()
|
||||||
|
.put("cluster.remote.foo.seeds", "192.168.0.1:8080")
|
||||||
|
.put("search.remote.foo.seeds", "192.168.0.1:8080")
|
||||||
|
.build()));
|
||||||
|
assertThat(e, hasToString(containsString("found duplicate remote cluster configurations for cluster alias [foo]")));
|
||||||
|
assertSettingDeprecationsAndWarnings(new String[]{"search.remote.foo.seeds"});
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testBuildRemoteClustersDynamicConfigWithDuplicates() {
|
||||||
|
final IllegalArgumentException e = expectThrows(
|
||||||
|
IllegalArgumentException.class,
|
||||||
|
() -> RemoteClusterService.buildRemoteClustersDynamicConfig(
|
||||||
|
Settings.builder()
|
||||||
|
.put("cluster.remote.foo.seeds", "192.168.0.1:8080")
|
||||||
|
.put("search.remote.foo.seeds", "192.168.0.1:8080")
|
||||||
|
.put("cluster.remote.bar.seeds", "192.168.0.1:8080")
|
||||||
|
.put("search.remote.bar.seeds", "192.168.0.1:8080")
|
||||||
|
.build()));
|
||||||
|
assertThat(e, hasToString(containsString("found duplicate remote cluster configurations for cluster aliases [bar,foo]")));
|
||||||
|
assertSettingDeprecationsAndWarnings(new String[]{"search.remote.bar.seeds", "search.remote.foo.seeds"});
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testGroupClusterIndices() throws IOException {
|
public void testGroupClusterIndices() throws IOException {
|
||||||
|
|
Loading…
Reference in New Issue