From 936dbb00e31a5c238f355d2bb90beb83b7edbf4e Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Mon, 4 Mar 2019 15:51:17 +0100 Subject: [PATCH] Isolate Zen1 (#39470) Cherry-picks a few commits from #39466 to align 7.x with master branch. --- docs/plugins/integrations.asciidoc | 2 +- docs/reference/getting-started.asciidoc | 4 +- docs/reference/modules/transport.asciidoc | 2 +- .../state/TransportClusterStateAction.java | 5 +- .../elasticsearch/cluster/ClusterState.java | 6 +- .../cluster/NodeConnectionsService.java | 7 +- .../cluster/coordination/Coordinator.java | 4 + .../cluster/coordination/JoinHelper.java | 6 +- .../coordination/ValidateJoinRequest.java | 52 + .../common/settings/ClusterSettings.java | 12 +- .../discovery/SeedHostsResolver.java | 131 ++- .../discovery/zen/MembershipAction.java | 29 +- .../discovery/zen/UnicastZenPing.java | 127 +-- .../cluster/MinimumMasterNodesIT.java | 2 +- .../ElasticsearchNodeCommandIT.java | 2 +- .../coordination/JoinTaskExecutorTests.java} | 4 +- .../coordination/PublicationTests.java | 45 +- .../discovery/DiscoveryModuleTests.java | 6 +- .../FileBasedSeedHostsProviderTests.java | 5 +- .../discovery/SeedHostsResolverTests.java | 293 +++++- .../discovery/zen/UnicastZenPingTests.java | 940 ------------------ .../discovery/zen/ZenDiscoveryUnitTests.java | 5 +- .../gateway/GatewayIndexStateIT.java | 7 +- .../test/transport/MockTransportService.java | 2 +- 24 files changed, 559 insertions(+), 1139 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/cluster/coordination/ValidateJoinRequest.java rename server/src/test/java/org/elasticsearch/{discovery/zen/MembershipActionTests.java => cluster/coordination/JoinTaskExecutorTests.java} (98%) delete mode 100644 server/src/test/java/org/elasticsearch/discovery/zen/UnicastZenPingTests.java diff --git a/docs/plugins/integrations.asciidoc b/docs/plugins/integrations.asciidoc index 6d543408f67..efbc83f2f8f 100644 --- a/docs/plugins/integrations.asciidoc +++ b/docs/plugins/integrations.asciidoc @@ -194,7 +194,7 @@ releases 2.0 and later do not support rivers. A pluggable elastic JavaScript query DSL builder for Elasticsearch * https://www.wireshark.org/[Wireshark]: - Protocol dissection for Zen discovery, HTTP and the binary protocol + Protocol dissection for HTTP and the transport protocol * https://www.itemsapi.com/[ItemsAPI]: Search backend for mobile and web diff --git a/docs/reference/getting-started.asciidoc b/docs/reference/getting-started.asciidoc index bee2ae51944..9e7571e0383 100755 --- a/docs/reference/getting-started.asciidoc +++ b/docs/reference/getting-started.asciidoc @@ -248,8 +248,8 @@ If everything goes well with installation, you should see a bunch of messages th [2018-09-13T12:20:05,202][INFO ][o.e.t.TransportService ] [localhost.localdomain] publish_address {127.0.0.1:9300}, bound_addresses {[::1]:9300}, {127.0.0.1:9300} [2018-09-13T12:20:05,221][WARN ][o.e.b.BootstrapChecks ] [localhost.localdomain] max file descriptors [4096] for elasticsearch process is too low, increase to at least [65535] [2018-09-13T12:20:05,221][WARN ][o.e.b.BootstrapChecks ] [localhost.localdomain] max virtual memory areas vm.max_map_count [65530] is too low, increase to at least [262144] -[2018-09-13T12:20:08,355][INFO ][o.e.c.s.MasterService ] [localhost.localdomain] zen-disco-elected-as-master ([0] nodes joined)[, ], reason: master node changed {previous [], current [{localhost.localdomain}{B0aEHNagTiWx7SYj-l4NTw}{hzsQz6CVQMCTpMCVLM4IHg}{127.0.0.1}{127.0.0.1:9300}{testattr=test}]} -[2018-09-13T12:20:08,360][INFO ][o.e.c.s.ClusterApplierService] [localhost.localdomain] master node changed {previous [], current [{localhost.localdomain}{B0aEHNagTiWx7SYj-l4NTw}{hzsQz6CVQMCTpMCVLM4IHg}{127.0.0.1}{127.0.0.1:9300}{testattr=test}]}, reason: apply cluster state (from master [master {localhost.localdomain}{B0aEHNagTiWx7SYj-l4NTw}{hzsQz6CVQMCTpMCVLM4IHg}{127.0.0.1}{127.0.0.1:9300}{testattr=test} committed version [1] source [zen-disco-elected-as-master ([0] nodes joined)[, ]]]) +[2018-09-13T12:20:08,355][INFO ][o.e.c.s.MasterService ] [localhost.localdomain] elected-as-master ([0] nodes joined)[, ], reason: master node changed {previous [], current [{localhost.localdomain}{B0aEHNagTiWx7SYj-l4NTw}{hzsQz6CVQMCTpMCVLM4IHg}{127.0.0.1}{127.0.0.1:9300}{testattr=test}]} +[2018-09-13T12:20:08,360][INFO ][o.e.c.s.ClusterApplierService] [localhost.localdomain] master node changed {previous [], current [{localhost.localdomain}{B0aEHNagTiWx7SYj-l4NTw}{hzsQz6CVQMCTpMCVLM4IHg}{127.0.0.1}{127.0.0.1:9300}{testattr=test}]}, reason: apply cluster state (from master [master {localhost.localdomain}{B0aEHNagTiWx7SYj-l4NTw}{hzsQz6CVQMCTpMCVLM4IHg}{127.0.0.1}{127.0.0.1:9300}{testattr=test} committed version [1] source [elected-as-master ([0] nodes joined)[, ]]]) [2018-09-13T12:20:08,384][INFO ][o.e.h.n.Netty4HttpServerTransport] [localhost.localdomain] publish_address {127.0.0.1:9200}, bound_addresses {[::1]:9200}, {127.0.0.1:9200} [2018-09-13T12:20:08,384][INFO ][o.e.n.Node ] [localhost.localdomain] started diff --git a/docs/reference/modules/transport.asciidoc b/docs/reference/modules/transport.asciidoc index 180a8190868..8ee2f55a9b1 100644 --- a/docs/reference/modules/transport.asciidoc +++ b/docs/reference/modules/transport.asciidoc @@ -156,7 +156,7 @@ PUT _cluster/settings { "transient" : { "transport.tracer.include" : "*", - "transport.tracer.exclude" : "internal:discovery/zen/fd*" + "transport.tracer.exclude" : "internal:coordination/fault_detection/*" } } -------------------------------------------------- diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/state/TransportClusterStateAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/state/TransportClusterStateAction.java index 75dc811f37d..3eb3f1746b4 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/state/TransportClusterStateAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/state/TransportClusterStateAction.java @@ -27,6 +27,7 @@ import org.elasticsearch.action.support.master.TransportMasterNodeReadAction; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateObserver; import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.coordination.PublicationTransportHandler; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.MetaData; @@ -42,8 +43,6 @@ import org.elasticsearch.transport.TransportService; import java.io.IOException; import java.util.function.Predicate; -import static org.elasticsearch.discovery.zen.PublishClusterStateAction.serializeFullClusterState; - public class TransportClusterStateAction extends TransportMasterNodeReadAction { @@ -185,7 +184,7 @@ public class TransportClusterStateAction extends TransportMasterNodeReadAction * The cluster state implements the {@link Diffable} interface in order to support publishing of cluster state * differences instead of the entire state on each change. The publishing mechanism should only send differences diff --git a/server/src/main/java/org/elasticsearch/cluster/NodeConnectionsService.java b/server/src/main/java/org/elasticsearch/cluster/NodeConnectionsService.java index d6c2824fdbb..28249754b50 100644 --- a/server/src/main/java/org/elasticsearch/cluster/NodeConnectionsService.java +++ b/server/src/main/java/org/elasticsearch/cluster/NodeConnectionsService.java @@ -21,6 +21,8 @@ package org.elasticsearch.cluster; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.cluster.coordination.FollowersChecker; +import org.elasticsearch.cluster.coordination.LeaderChecker; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.component.AbstractLifecycleComponent; @@ -32,8 +34,6 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.KeyedLock; -import org.elasticsearch.discovery.zen.MasterFaultDetection; -import org.elasticsearch.discovery.zen.NodesFaultDetection; import org.elasticsearch.threadpool.Scheduler; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -51,8 +51,7 @@ import static org.elasticsearch.common.settings.Setting.positiveTimeSetting; * This component is responsible for connecting to nodes once they are added to the cluster state, and disconnect when they are * removed. Also, it periodically checks that all connections are still open and if needed restores them. * Note that this component is *not* responsible for removing nodes from the cluster if they disconnect / do not respond - * to pings. This is done by {@link NodesFaultDetection}. Master fault detection - * is done by {@link MasterFaultDetection}. + * to pings. This is done by {@link FollowersChecker}. Master fault detection is done by {@link LeaderChecker}. */ public class NodeConnectionsService extends AbstractLifecycleComponent { private static final Logger logger = LogManager.getLogger(NodeConnectionsService.class); diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index 1f640acc54f..73feb21990e 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -1071,6 +1071,10 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery } } + public Collection> getOnJoinValidators() { + return onJoinValidators; + } + public enum Mode { CANDIDATE, LEADER, FOLLOWER } diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java index a9309e9fe63..ad3ab9c4147 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java @@ -131,7 +131,7 @@ public class JoinHelper { }); transportService.registerRequestHandler(VALIDATE_JOIN_ACTION_NAME, - MembershipAction.ValidateJoinRequest::new, ThreadPool.Names.GENERIC, + ValidateJoinRequest::new, ThreadPool.Names.GENERIC, (request, channel, task) -> { final ClusterState localState = currentStateSupplier.get(); if (localState.metaData().clusterUUIDCommitted() && @@ -145,7 +145,7 @@ public class JoinHelper { }); transportService.registerRequestHandler(MembershipAction.DISCOVERY_JOIN_VALIDATE_ACTION_NAME, - MembershipAction.ValidateJoinRequest::new, ThreadPool.Names.GENERIC, + ValidateJoinRequest::new, ThreadPool.Names.GENERIC, (request, channel, task) -> { joinValidators.forEach(action -> action.accept(transportService.getLocalNode(), request.getState())); channel.sendResponse(Empty.INSTANCE); @@ -276,7 +276,7 @@ public class JoinHelper { actionName = VALIDATE_JOIN_ACTION_NAME; } transportService.sendRequest(node, actionName, - new MembershipAction.ValidateJoinRequest(state), + new ValidateJoinRequest(state), TransportRequestOptions.builder().withTimeout(joinTimeout).build(), new EmptyTransportResponseHandler(ThreadPool.Names.GENERIC) { @Override diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/ValidateJoinRequest.java b/server/src/main/java/org/elasticsearch/cluster/coordination/ValidateJoinRequest.java new file mode 100644 index 00000000000..dec4a13c67d --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/ValidateJoinRequest.java @@ -0,0 +1,52 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.cluster.coordination; + +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.transport.TransportRequest; + +import java.io.IOException; + +public class ValidateJoinRequest extends TransportRequest { + private ClusterState state; + + public ValidateJoinRequest() {} + + public ValidateJoinRequest(ClusterState state) { + this.state = state; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + this.state = ClusterState.readFrom(in, null); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + this.state.writeTo(out); + } + + public ClusterState getState() { + return state; + } +} diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index 0ae30c465b2..95755108047 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -67,10 +67,10 @@ import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.discovery.DiscoveryModule; import org.elasticsearch.discovery.DiscoverySettings; import org.elasticsearch.discovery.PeerFinder; +import org.elasticsearch.discovery.SeedHostsResolver; +import org.elasticsearch.discovery.SettingsBasedSeedHostsProvider; import org.elasticsearch.discovery.zen.ElectMasterService; import org.elasticsearch.discovery.zen.FaultDetection; -import org.elasticsearch.discovery.SettingsBasedSeedHostsProvider; -import org.elasticsearch.discovery.zen.UnicastZenPing; import org.elasticsearch.discovery.zen.ZenDiscovery; import org.elasticsearch.env.Environment; import org.elasticsearch.env.NodeEnvironment; @@ -407,10 +407,10 @@ public final class ClusterSettings extends AbstractScopedSettings { ZenDiscovery.MAX_PENDING_CLUSTER_STATES_SETTING, SettingsBasedSeedHostsProvider.DISCOVERY_SEED_HOSTS_SETTING, SettingsBasedSeedHostsProvider.LEGACY_DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING, - UnicastZenPing.DISCOVERY_SEED_RESOLVER_MAX_CONCURRENT_RESOLVERS_SETTING, - UnicastZenPing.DISCOVERY_SEED_RESOLVER_TIMEOUT_SETTING, - UnicastZenPing.LEGACY_DISCOVERY_ZEN_PING_UNICAST_CONCURRENT_CONNECTS_SETTING, - UnicastZenPing.LEGACY_DISCOVERY_ZEN_PING_UNICAST_HOSTS_RESOLVE_TIMEOUT, + SeedHostsResolver.DISCOVERY_SEED_RESOLVER_MAX_CONCURRENT_RESOLVERS_SETTING, + SeedHostsResolver.DISCOVERY_SEED_RESOLVER_TIMEOUT_SETTING, + SeedHostsResolver.LEGACY_DISCOVERY_ZEN_PING_UNICAST_CONCURRENT_CONNECTS_SETTING, + SeedHostsResolver.LEGACY_DISCOVERY_ZEN_PING_UNICAST_HOSTS_RESOLVE_TIMEOUT, SearchService.DEFAULT_KEEPALIVE_SETTING, SearchService.KEEPALIVE_INTERVAL_SETTING, SearchService.MAX_KEEPALIVE_SETTING, diff --git a/server/src/main/java/org/elasticsearch/discovery/SeedHostsResolver.java b/server/src/main/java/org/elasticsearch/discovery/SeedHostsResolver.java index b656cc288e3..926216b9b68 100644 --- a/server/src/main/java/org/elasticsearch/discovery/SeedHostsResolver.java +++ b/server/src/main/java/org/elasticsearch/discovery/SeedHostsResolver.java @@ -23,24 +23,46 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.lucene.util.SetOnce; import org.elasticsearch.common.component.AbstractLifecycleComponent; +import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.discovery.PeerFinder.ConfiguredHostsResolver; -import org.elasticsearch.discovery.zen.UnicastZenPing; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; +import java.util.stream.Collectors; public class SeedHostsResolver extends AbstractLifecycleComponent implements ConfiguredHostsResolver { + public static final Setting LEGACY_DISCOVERY_ZEN_PING_UNICAST_CONCURRENT_CONNECTS_SETTING = + Setting.intSetting("discovery.zen.ping.unicast.concurrent_connects", 10, 0, Setting.Property.NodeScope, + Setting.Property.Deprecated); + public static final Setting LEGACY_DISCOVERY_ZEN_PING_UNICAST_HOSTS_RESOLVE_TIMEOUT = + Setting.positiveTimeSetting("discovery.zen.ping.unicast.hosts.resolve_timeout", TimeValue.timeValueSeconds(5), + Setting.Property.NodeScope, Setting.Property.Deprecated); + public static final Setting DISCOVERY_SEED_RESOLVER_MAX_CONCURRENT_RESOLVERS_SETTING = + Setting.intSetting("discovery.seed_resolver.max_concurrent_resolvers", 10, 0, Setting.Property.NodeScope); + public static final Setting DISCOVERY_SEED_RESOLVER_TIMEOUT_SETTING = + Setting.positiveTimeSetting("discovery.seed_resolver.timeout", TimeValue.timeValueSeconds(5), Setting.Property.NodeScope); + private static final Logger logger = LogManager.getLogger(SeedHostsResolver.class); private final Settings settings; @@ -58,8 +80,109 @@ public class SeedHostsResolver extends AbstractLifecycleComponent implements Con this.nodeName = nodeName; this.transportService = transportService; this.hostsProvider = seedProvider; - resolveTimeout = UnicastZenPing.getResolveTimeout(settings); - concurrentConnects = UnicastZenPing.getMaxConcurrentResolvers(settings); + resolveTimeout = getResolveTimeout(settings); + concurrentConnects = getMaxConcurrentResolvers(settings); + } + + public static int getMaxConcurrentResolvers(Settings settings) { + if (LEGACY_DISCOVERY_ZEN_PING_UNICAST_CONCURRENT_CONNECTS_SETTING.exists(settings)) { + if (DISCOVERY_SEED_RESOLVER_MAX_CONCURRENT_RESOLVERS_SETTING.exists(settings)) { + throw new IllegalArgumentException("it is forbidden to set both [" + + DISCOVERY_SEED_RESOLVER_MAX_CONCURRENT_RESOLVERS_SETTING.getKey() + "] and [" + + LEGACY_DISCOVERY_ZEN_PING_UNICAST_CONCURRENT_CONNECTS_SETTING.getKey() + "]"); + } + return LEGACY_DISCOVERY_ZEN_PING_UNICAST_CONCURRENT_CONNECTS_SETTING.get(settings); + } + return DISCOVERY_SEED_RESOLVER_MAX_CONCURRENT_RESOLVERS_SETTING.get(settings); + } + + public static TimeValue getResolveTimeout(Settings settings) { + if (LEGACY_DISCOVERY_ZEN_PING_UNICAST_HOSTS_RESOLVE_TIMEOUT.exists(settings)) { + if (DISCOVERY_SEED_RESOLVER_TIMEOUT_SETTING.exists(settings)) { + throw new IllegalArgumentException("it is forbidden to set both [" + + DISCOVERY_SEED_RESOLVER_TIMEOUT_SETTING.getKey() + "] and [" + + LEGACY_DISCOVERY_ZEN_PING_UNICAST_HOSTS_RESOLVE_TIMEOUT.getKey() + "]"); + } + return LEGACY_DISCOVERY_ZEN_PING_UNICAST_HOSTS_RESOLVE_TIMEOUT.get(settings); + } + return DISCOVERY_SEED_RESOLVER_TIMEOUT_SETTING.get(settings); + } + + /** + * Resolves a list of hosts to a list of transport addresses. Each host is resolved into a transport address (or a collection of + * addresses if the number of ports is greater than one). Host lookups are done in parallel using specified executor service up + * to the specified resolve timeout. + * + * @param executorService the executor service used to parallelize hostname lookups + * @param logger logger used for logging messages regarding hostname lookups + * @param hosts the hosts to resolve + * @param limitPortCounts the number of ports to resolve (should be 1 for non-local transport) + * @param transportService the transport service + * @param resolveTimeout the timeout before returning from hostname lookups + * @return a list of resolved transport addresses + */ + public static List resolveHostsLists( + final ExecutorService executorService, + final Logger logger, + final List hosts, + final int limitPortCounts, + final TransportService transportService, + final TimeValue resolveTimeout) { + Objects.requireNonNull(executorService); + Objects.requireNonNull(logger); + Objects.requireNonNull(hosts); + Objects.requireNonNull(transportService); + Objects.requireNonNull(resolveTimeout); + if (resolveTimeout.nanos() < 0) { + throw new IllegalArgumentException("resolve timeout must be non-negative but was [" + resolveTimeout + "]"); + } + // create tasks to submit to the executor service; we will wait up to resolveTimeout for these tasks to complete + final List> callables = + hosts + .stream() + .map(hn -> (Callable) () -> transportService.addressesFromString(hn, limitPortCounts)) + .collect(Collectors.toList()); + final List> futures; + try { + futures = executorService.invokeAll(callables, resolveTimeout.nanos(), TimeUnit.NANOSECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return Collections.emptyList(); + } + final List transportAddresses = new ArrayList<>(); + final Set localAddresses = new HashSet<>(); + localAddresses.add(transportService.boundAddress().publishAddress()); + localAddresses.addAll(Arrays.asList(transportService.boundAddress().boundAddresses())); + // ExecutorService#invokeAll guarantees that the futures are returned in the iteration order of the tasks so we can associate the + // hostname with the corresponding task by iterating together + final Iterator it = hosts.iterator(); + for (final Future future : futures) { + final String hostname = it.next(); + if (!future.isCancelled()) { + assert future.isDone(); + try { + final TransportAddress[] addresses = future.get(); + logger.trace("resolved host [{}] to {}", hostname, addresses); + for (int addressId = 0; addressId < addresses.length; addressId++) { + final TransportAddress address = addresses[addressId]; + // no point in pinging ourselves + if (localAddresses.contains(address) == false) { + transportAddresses.add(address); + } + } + } catch (final ExecutionException e) { + assert e.getCause() != null; + final String message = "failed to resolve host [" + hostname + "]"; + logger.warn(message, e.getCause()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + // ignore + } + } else { + logger.warn("timed out after [{}] resolving host [{}]", resolveTimeout, hostname); + } + } + return Collections.unmodifiableList(transportAddresses); } @Override @@ -102,7 +225,7 @@ public class SeedHostsResolver extends AbstractLifecycleComponent implements Con List providedAddresses = hostsProvider.getSeedAddresses((hosts, limitPortCounts) - -> UnicastZenPing.resolveHostsLists(executorService.get(), logger, hosts, limitPortCounts, + -> resolveHostsLists(executorService.get(), logger, hosts, limitPortCounts, transportService, resolveTimeout)); consumer.accept(providedAddresses); diff --git a/server/src/main/java/org/elasticsearch/discovery/zen/MembershipAction.java b/server/src/main/java/org/elasticsearch/discovery/zen/MembershipAction.java index 550b25083fb..9b8d9f14c2e 100644 --- a/server/src/main/java/org/elasticsearch/discovery/zen/MembershipAction.java +++ b/server/src/main/java/org/elasticsearch/discovery/zen/MembershipAction.java @@ -22,6 +22,7 @@ package org.elasticsearch.discovery.zen; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.coordination.ValidateJoinRequest; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -159,32 +160,6 @@ public class MembershipAction { } } - public static class ValidateJoinRequest extends TransportRequest { - private ClusterState state; - - public ValidateJoinRequest() {} - - public ValidateJoinRequest(ClusterState state) { - this.state = state; - } - - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - this.state = ClusterState.readFrom(in, null); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - this.state.writeTo(out); - } - - public ClusterState getState() { - return state; - } - } - static class ValidateJoinRequestRequestHandler implements TransportRequestHandler { private final Supplier localNodeSupplier; private final Collection> joinValidators; @@ -199,7 +174,7 @@ public class MembershipAction { public void messageReceived(ValidateJoinRequest request, TransportChannel channel, Task task) throws Exception { DiscoveryNode node = localNodeSupplier.get(); assert node != null : "local node is null"; - joinValidators.stream().forEach(action -> action.accept(node, request.state)); + joinValidators.stream().forEach(action -> action.accept(node, request.getState())); channel.sendResponse(TransportResponse.Empty.INSTANCE); } } diff --git a/server/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java b/server/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java index d0cd0e27976..d6b6f00311b 100644 --- a/server/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java +++ b/server/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java @@ -34,8 +34,6 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasables; -import org.elasticsearch.common.settings.Setting; -import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.TimeValue; @@ -46,6 +44,7 @@ import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor; import org.elasticsearch.common.util.concurrent.KeyedLock; import org.elasticsearch.discovery.SeedHostsProvider; +import org.elasticsearch.discovery.SeedHostsResolver; import org.elasticsearch.node.Node; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; @@ -68,18 +67,11 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Locale; import java.util.Map; -import java.util.Objects; import java.util.Queue; import java.util.Set; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -98,17 +90,6 @@ public class UnicastZenPing implements ZenPing { public static final String ACTION_NAME = "internal:discovery/zen/unicast"; - public static final Setting LEGACY_DISCOVERY_ZEN_PING_UNICAST_CONCURRENT_CONNECTS_SETTING = - Setting.intSetting("discovery.zen.ping.unicast.concurrent_connects", 10, 0, Property.NodeScope, Property.Deprecated); - public static final Setting LEGACY_DISCOVERY_ZEN_PING_UNICAST_HOSTS_RESOLVE_TIMEOUT = - Setting.positiveTimeSetting("discovery.zen.ping.unicast.hosts.resolve_timeout", TimeValue.timeValueSeconds(5), - Property.NodeScope, Property.Deprecated); - - public static final Setting DISCOVERY_SEED_RESOLVER_MAX_CONCURRENT_RESOLVERS_SETTING = - Setting.intSetting("discovery.seed_resolver.max_concurrent_resolvers", 10, 0, Property.NodeScope); - public static final Setting DISCOVERY_SEED_RESOLVER_TIMEOUT_SETTING = - Setting.positiveTimeSetting("discovery.seed_resolver.timeout", TimeValue.timeValueSeconds(5), Property.NodeScope); - private final ThreadPool threadPool; private final TransportService transportService; private final ClusterName clusterName; @@ -140,8 +121,8 @@ public class UnicastZenPing implements ZenPing { this.hostsProvider = seedHostsProvider; this.contextProvider = contextProvider; - final int concurrentConnects = getMaxConcurrentResolvers(settings); - resolveTimeout = getResolveTimeout(settings); + final int concurrentConnects = SeedHostsResolver.getMaxConcurrentResolvers(settings); + resolveTimeout = SeedHostsResolver.getResolveTimeout(settings); nodeName = Node.NODE_NAME_SETTING.get(settings); logger.debug( "using max_concurrent_resolvers [{}], resolver timeout [{}]", @@ -162,85 +143,8 @@ public class UnicastZenPing implements ZenPing { threadPool.getThreadContext()); } - /** - * Resolves a list of hosts to a list of transport addresses. Each host is resolved into a transport address (or a collection of - * addresses if the number of ports is greater than one). Host lookups are done in parallel using specified executor service up - * to the specified resolve timeout. - * - * @param executorService the executor service used to parallelize hostname lookups - * @param logger logger used for logging messages regarding hostname lookups - * @param hosts the hosts to resolve - * @param limitPortCounts the number of ports to resolve (should be 1 for non-local transport) - * @param transportService the transport service - * @param resolveTimeout the timeout before returning from hostname lookups - * @return a list of resolved transport addresses - */ - public static List resolveHostsLists( - final ExecutorService executorService, - final Logger logger, - final List hosts, - final int limitPortCounts, - final TransportService transportService, - final TimeValue resolveTimeout) { - Objects.requireNonNull(executorService); - Objects.requireNonNull(logger); - Objects.requireNonNull(hosts); - Objects.requireNonNull(transportService); - Objects.requireNonNull(resolveTimeout); - if (resolveTimeout.nanos() < 0) { - throw new IllegalArgumentException("resolve timeout must be non-negative but was [" + resolveTimeout + "]"); - } - // create tasks to submit to the executor service; we will wait up to resolveTimeout for these tasks to complete - final List> callables = - hosts - .stream() - .map(hn -> (Callable) () -> transportService.addressesFromString(hn, limitPortCounts)) - .collect(Collectors.toList()); - final List> futures; - try { - futures = executorService.invokeAll(callables, resolveTimeout.nanos(), TimeUnit.NANOSECONDS); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - return Collections.emptyList(); - } - final List transportAddresses = new ArrayList<>(); - final Set localAddresses = new HashSet<>(); - localAddresses.add(transportService.boundAddress().publishAddress()); - localAddresses.addAll(Arrays.asList(transportService.boundAddress().boundAddresses())); - // ExecutorService#invokeAll guarantees that the futures are returned in the iteration order of the tasks so we can associate the - // hostname with the corresponding task by iterating together - final Iterator it = hosts.iterator(); - for (final Future future : futures) { - final String hostname = it.next(); - if (!future.isCancelled()) { - assert future.isDone(); - try { - final TransportAddress[] addresses = future.get(); - logger.trace("resolved host [{}] to {}", hostname, addresses); - for (int addressId = 0; addressId < addresses.length; addressId++) { - final TransportAddress address = addresses[addressId]; - // no point in pinging ourselves - if (localAddresses.contains(address) == false) { - transportAddresses.add(address); - } - } - } catch (final ExecutionException e) { - assert e.getCause() != null; - final String message = "failed to resolve host [" + hostname + "]"; - logger.warn(message, e.getCause()); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - // ignore - } - } else { - logger.warn("timed out after [{}] resolving host [{}]", resolveTimeout, hostname); - } - } - return Collections.unmodifiableList(transportAddresses); - } - private SeedHostsProvider.HostsResolver createHostsResolver() { - return (hosts, limitPortCounts) -> resolveHostsLists(unicastZenPingExecutorService, logger, hosts, + return (hosts, limitPortCounts) -> SeedHostsResolver.resolveHostsLists(unicastZenPingExecutorService, logger, hosts, limitPortCounts, transportService, resolveTimeout); } @@ -672,27 +576,4 @@ public class UnicastZenPing implements ZenPing { return Version.CURRENT; // for tests } - public static int getMaxConcurrentResolvers(Settings settings) { - if (LEGACY_DISCOVERY_ZEN_PING_UNICAST_CONCURRENT_CONNECTS_SETTING.exists(settings)) { - if (DISCOVERY_SEED_RESOLVER_MAX_CONCURRENT_RESOLVERS_SETTING.exists(settings)) { - throw new IllegalArgumentException("it is forbidden to set both [" - + DISCOVERY_SEED_RESOLVER_MAX_CONCURRENT_RESOLVERS_SETTING.getKey() + "] and [" - + LEGACY_DISCOVERY_ZEN_PING_UNICAST_CONCURRENT_CONNECTS_SETTING.getKey() + "]"); - } - return LEGACY_DISCOVERY_ZEN_PING_UNICAST_CONCURRENT_CONNECTS_SETTING.get(settings); - } - return DISCOVERY_SEED_RESOLVER_MAX_CONCURRENT_RESOLVERS_SETTING.get(settings); - } - - public static TimeValue getResolveTimeout(Settings settings) { - if (LEGACY_DISCOVERY_ZEN_PING_UNICAST_HOSTS_RESOLVE_TIMEOUT.exists(settings)) { - if (DISCOVERY_SEED_RESOLVER_TIMEOUT_SETTING.exists(settings)) { - throw new IllegalArgumentException("it is forbidden to set both [" - + DISCOVERY_SEED_RESOLVER_TIMEOUT_SETTING.getKey() + "] and [" - + LEGACY_DISCOVERY_ZEN_PING_UNICAST_HOSTS_RESOLVE_TIMEOUT.getKey() + "]"); - } - return LEGACY_DISCOVERY_ZEN_PING_UNICAST_HOSTS_RESOLVE_TIMEOUT.get(settings); - } - return DISCOVERY_SEED_RESOLVER_TIMEOUT_SETTING.get(settings); - } } diff --git a/server/src/test/java/org/elasticsearch/cluster/MinimumMasterNodesIT.java b/server/src/test/java/org/elasticsearch/cluster/MinimumMasterNodesIT.java index 4df8ecaf77b..8f395c2d137 100644 --- a/server/src/test/java/org/elasticsearch/cluster/MinimumMasterNodesIT.java +++ b/server/src/test/java/org/elasticsearch/cluster/MinimumMasterNodesIT.java @@ -60,7 +60,7 @@ import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; @ClusterScope(scope = Scope.TEST, numDataNodes = 0, autoMinMasterNodes = false) -@TestLogging("_root:DEBUG,org.elasticsearch.cluster.service:TRACE,org.elasticsearch.discovery.zen:TRACE") +@TestLogging("_root:DEBUG,org.elasticsearch.cluster.service:TRACE,org.elasticsearch.cluster.coordination:TRACE") public class MinimumMasterNodesIT extends ESIntegTestCase { @Override diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/ElasticsearchNodeCommandIT.java b/server/src/test/java/org/elasticsearch/cluster/coordination/ElasticsearchNodeCommandIT.java index 6fe7b65fad7..517db688e81 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/ElasticsearchNodeCommandIT.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/ElasticsearchNodeCommandIT.java @@ -49,7 +49,7 @@ import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, autoMinMasterNodes = false) -@TestLogging("_root:DEBUG,org.elasticsearch.cluster.service:TRACE,org.elasticsearch.discovery.zen:TRACE") +@TestLogging("_root:DEBUG,org.elasticsearch.cluster.service:TRACE,org.elasticsearch.cluster.coordination:TRACE") public class ElasticsearchNodeCommandIT extends ESIntegTestCase { private MockTerminal executeCommand(ElasticsearchNodeCommand command, Environment environment, int nodeOrdinal, boolean abort) diff --git a/server/src/test/java/org/elasticsearch/discovery/zen/MembershipActionTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/JoinTaskExecutorTests.java similarity index 98% rename from server/src/test/java/org/elasticsearch/discovery/zen/MembershipActionTests.java rename to server/src/test/java/org/elasticsearch/cluster/coordination/JoinTaskExecutorTests.java index cafe050726c..35fa5786bbd 100644 --- a/server/src/test/java/org/elasticsearch/discovery/zen/MembershipActionTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/JoinTaskExecutorTests.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.elasticsearch.discovery.zen; +package org.elasticsearch.cluster.coordination; import org.elasticsearch.Version; import org.elasticsearch.cluster.coordination.JoinTaskExecutor; @@ -36,7 +36,7 @@ import static org.elasticsearch.test.VersionUtils.randomCompatibleVersion; import static org.elasticsearch.test.VersionUtils.randomVersion; import static org.elasticsearch.test.VersionUtils.randomVersionBetween; -public class MembershipActionTests extends ESTestCase { +public class JoinTaskExecutorTests extends ESTestCase { public void testPreventJoinClusterWithNewerIndices() { Settings.builder().build(); diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/PublicationTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/PublicationTests.java index d332888c185..d4cab5110ee 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/PublicationTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/PublicationTests.java @@ -25,11 +25,12 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.coordination.CoordinationMetaData.VotingConfiguration; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.discovery.Discovery; -import org.elasticsearch.discovery.zen.PublishClusterStateActionTests.AssertingAckListener; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportResponse; @@ -45,6 +46,8 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -57,6 +60,7 @@ import java.util.stream.Stream; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.emptyIterable; import static org.hamcrest.Matchers.equalTo; public class PublicationTests extends ESTestCase { @@ -478,4 +482,43 @@ public class PublicationTests extends ESTestCase { return ts.stream(); }); } + + public static class AssertingAckListener implements Discovery.AckListener { + private final List> errors = new CopyOnWriteArrayList<>(); + private final Set successfulAcks = Collections.synchronizedSet(new HashSet<>()); + private final CountDownLatch countDown; + private final CountDownLatch commitCountDown; + + public AssertingAckListener(int nodeCount) { + countDown = new CountDownLatch(nodeCount); + commitCountDown = new CountDownLatch(1); + } + + @Override + public void onCommit(TimeValue commitTime) { + commitCountDown.countDown(); + } + + @Override + public void onNodeAck(DiscoveryNode node, @Nullable Exception e) { + if (e != null) { + errors.add(new Tuple<>(node, e)); + } else { + successfulAcks.add(node); + } + countDown.countDown(); + } + + public Set await(long timeout, TimeUnit unit) throws InterruptedException { + assertThat(awaitErrors(timeout, unit), emptyIterable()); + assertTrue(commitCountDown.await(timeout, unit)); + return new HashSet<>(successfulAcks); + } + + public List> awaitErrors(long timeout, TimeUnit unit) throws InterruptedException { + countDown.await(timeout, unit); + return errors; + } + + } } diff --git a/server/src/test/java/org/elasticsearch/discovery/DiscoveryModuleTests.java b/server/src/test/java/org/elasticsearch/discovery/DiscoveryModuleTests.java index 30b0448e09f..fe74a736fe3 100644 --- a/server/src/test/java/org/elasticsearch/discovery/DiscoveryModuleTests.java +++ b/server/src/test/java/org/elasticsearch/discovery/DiscoveryModuleTests.java @@ -30,7 +30,6 @@ import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.core.internal.io.IOUtils; -import org.elasticsearch.discovery.zen.ZenDiscovery; import org.elasticsearch.gateway.GatewayMetaState; import org.elasticsearch.plugins.DiscoveryPlugin; import org.elasticsearch.test.ESTestCase; @@ -194,15 +193,14 @@ public class DiscoveryModuleTests extends ESTestCase { public void testJoinValidator() { BiConsumer consumer = (a, b) -> {}; - // TODO: move to zen2 once join validators are implemented DiscoveryModule module = newModule(Settings.builder().put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), - DiscoveryModule.ZEN_DISCOVERY_TYPE).build(), Collections.singletonList(new DiscoveryPlugin() { + DiscoveryModule.ZEN2_DISCOVERY_TYPE).build(), Collections.singletonList(new DiscoveryPlugin() { @Override public BiConsumer getJoinValidator() { return consumer; } })); - ZenDiscovery discovery = (ZenDiscovery) module.getDiscovery(); + Coordinator discovery = (Coordinator) module.getDiscovery(); Collection> onJoinValidators = discovery.getOnJoinValidators(); assertEquals(2, onJoinValidators.size()); assertTrue(onJoinValidators.contains(consumer)); diff --git a/server/src/test/java/org/elasticsearch/discovery/FileBasedSeedHostsProviderTests.java b/server/src/test/java/org/elasticsearch/discovery/FileBasedSeedHostsProviderTests.java index 4df4fc53aff..fc89bd1a2f3 100644 --- a/server/src/test/java/org/elasticsearch/discovery/FileBasedSeedHostsProviderTests.java +++ b/server/src/test/java/org/elasticsearch/discovery/FileBasedSeedHostsProviderTests.java @@ -27,7 +27,6 @@ import org.elasticsearch.common.transport.BoundTransportAddress; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.PageCacheRecycler; -import org.elasticsearch.discovery.zen.UnicastZenPing; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.transport.MockTransportService; @@ -117,7 +116,7 @@ public class FileBasedSeedHostsProviderTests extends ESTestCase { public void testUnicastHostsDoesNotExist() { final FileBasedSeedHostsProvider provider = new FileBasedSeedHostsProvider(createTempDir().toAbsolutePath()); final List addresses = provider.getSeedAddresses((hosts, limitPortCounts) -> - UnicastZenPing.resolveHostsLists(executorService, logger, hosts, limitPortCounts, transportService, + SeedHostsResolver.resolveHostsLists(executorService, logger, hosts, limitPortCounts, transportService, TimeValue.timeValueSeconds(10))); assertEquals(0, addresses.size()); } @@ -147,7 +146,7 @@ public class FileBasedSeedHostsProviderTests extends ESTestCase { } return new FileBasedSeedHostsProvider(configPath).getSeedAddresses((hosts, limitPortCounts) -> - UnicastZenPing.resolveHostsLists(executorService, logger, hosts, limitPortCounts, transportService, + SeedHostsResolver.resolveHostsLists(executorService, logger, hosts, limitPortCounts, transportService, TimeValue.timeValueSeconds(10))); } } diff --git a/server/src/test/java/org/elasticsearch/discovery/SeedHostsResolverTests.java b/server/src/test/java/org/elasticsearch/discovery/SeedHostsResolverTests.java index 3254d2e9201..0506f5c48e8 100644 --- a/server/src/test/java/org/elasticsearch/discovery/SeedHostsResolverTests.java +++ b/server/src/test/java/org/elasticsearch/discovery/SeedHostsResolverTests.java @@ -19,24 +19,57 @@ package org.elasticsearch.discovery; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.Version; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.network.NetworkAddress; +import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.BoundTransportAddress; import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.PageCacheRecycler; +import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.core.internal.io.IOUtils; +import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.transport.nio.MockNioTransport; import org.junit.After; import org.junit.Before; +import org.mockito.Matchers; +import java.io.Closeable; +import java.io.IOException; +import java.net.InetAddress; +import java.net.UnknownHostException; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; import java.util.List; +import java.util.Set; +import java.util.Stack; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.core.IsNull.nullValue; +import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; public class SeedHostsResolverTests extends ESTestCase { @@ -44,6 +77,10 @@ public class SeedHostsResolverTests extends ESTestCase { private List transportAddresses; private SeedHostsResolver seedHostsResolver; private ThreadPool threadPool; + private ExecutorService executorService; + // close in reverse order as opened + private Stack closeables; + @Before public void startResolver() { @@ -55,12 +92,29 @@ public class SeedHostsResolverTests extends ESTestCase { seedHostsResolver = new SeedHostsResolver("test_node", Settings.EMPTY, transportService, hostsResolver -> transportAddresses); seedHostsResolver.start(); + + final ThreadFactory threadFactory = EsExecutors.daemonThreadFactory("[" + getClass().getName() + "]"); + executorService = + EsExecutors.newScaling( + getClass().getName() + "/" + getTestName(), 0, 2, 60, TimeUnit.SECONDS, threadFactory, threadPool.getThreadContext()); + closeables = new Stack<>(); } @After - public void stopResolver() { + public void stopResolver() throws IOException { seedHostsResolver.stop(); - threadPool.shutdown(); + try { + logger.info("shutting down..."); + // JDK stack is broken, it does not iterate in the expected order (http://bugs.java.com/bugdatabase/view_bug.do?bug_id=4475301) + final List reverse = new ArrayList<>(); + while (!closeables.isEmpty()) { + reverse.add(closeables.pop()); + } + IOUtils.close(reverse); + } finally { + terminate(executorService); + terminate(threadPool); + } } public void testResolvesAddressesInBackgroundAndIgnoresConcurrentCalls() throws Exception { @@ -92,4 +146,239 @@ public class SeedHostsResolverTests extends ESTestCase { assertTrue(endLatch.await(30, TimeUnit.SECONDS)); assertThat(resolvedAddressesRef.get(), equalTo(transportAddresses)); } + + public void testPortLimit() { + final NetworkService networkService = new NetworkService(Collections.emptyList()); + final Transport transport = new MockNioTransport( + Settings.EMPTY, + Version.CURRENT, + threadPool, + networkService, + PageCacheRecycler.NON_RECYCLING_INSTANCE, + new NamedWriteableRegistry(Collections.emptyList()), + new NoneCircuitBreakerService()) { + + @Override + public BoundTransportAddress boundAddress() { + return new BoundTransportAddress( + new TransportAddress[]{new TransportAddress(InetAddress.getLoopbackAddress(), 9500)}, + new TransportAddress(InetAddress.getLoopbackAddress(), 9500) + ); + } + }; + closeables.push(transport); + final TransportService transportService = + new TransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, + Collections.emptySet()); + closeables.push(transportService); + final int limitPortCounts = randomIntBetween(1, 10); + final List transportAddresses = SeedHostsResolver.resolveHostsLists( + executorService, + logger, + Collections.singletonList("127.0.0.1"), + limitPortCounts, + transportService, + TimeValue.timeValueSeconds(30)); + assertThat(transportAddresses, hasSize(limitPortCounts)); + final Set ports = new HashSet<>(); + for (final TransportAddress address : transportAddresses) { + assertTrue(address.address().getAddress().isLoopbackAddress()); + ports.add(address.getPort()); + } + assertThat(ports, equalTo(IntStream.range(9300, 9300 + limitPortCounts).mapToObj(m -> m).collect(Collectors.toSet()))); + } + + public void testRemovingLocalAddresses() { + final NetworkService networkService = new NetworkService(Collections.emptyList()); + final InetAddress loopbackAddress = InetAddress.getLoopbackAddress(); + final Transport transport = new MockNioTransport( + Settings.EMPTY, + Version.CURRENT, + threadPool, + networkService, + PageCacheRecycler.NON_RECYCLING_INSTANCE, + new NamedWriteableRegistry(Collections.emptyList()), + new NoneCircuitBreakerService()) { + + @Override + public BoundTransportAddress boundAddress() { + return new BoundTransportAddress( + new TransportAddress[]{ + new TransportAddress(loopbackAddress, 9300), + new TransportAddress(loopbackAddress, 9301) + }, + new TransportAddress(loopbackAddress, 9302) + ); + } + }; + closeables.push(transport); + final TransportService transportService = + new TransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, + Collections.emptySet()); + closeables.push(transportService); + final List transportAddresses = SeedHostsResolver.resolveHostsLists( + executorService, + logger, + Collections.singletonList(NetworkAddress.format(loopbackAddress)), + 10, + transportService, + TimeValue.timeValueSeconds(30)); + assertThat(transportAddresses, hasSize(7)); + final Set ports = new HashSet<>(); + for (final TransportAddress address : transportAddresses) { + assertTrue(address.address().getAddress().isLoopbackAddress()); + ports.add(address.getPort()); + } + assertThat(ports, equalTo(IntStream.range(9303, 9310).mapToObj(m -> m).collect(Collectors.toSet()))); + } + + public void testUnknownHost() { + final Logger logger = mock(Logger.class); + final NetworkService networkService = new NetworkService(Collections.emptyList()); + final String hostname = randomAlphaOfLength(8); + final UnknownHostException unknownHostException = new UnknownHostException(hostname); + final Transport transport = new MockNioTransport( + Settings.EMPTY, + Version.CURRENT, + threadPool, + networkService, + PageCacheRecycler.NON_RECYCLING_INSTANCE, + new NamedWriteableRegistry(Collections.emptyList()), + new NoneCircuitBreakerService()) { + + @Override + public BoundTransportAddress boundAddress() { + return new BoundTransportAddress( + new TransportAddress[]{new TransportAddress(InetAddress.getLoopbackAddress(), 9300)}, + new TransportAddress(InetAddress.getLoopbackAddress(), 9300) + ); + } + + @Override + public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws UnknownHostException { + throw unknownHostException; + } + + }; + closeables.push(transport); + + final TransportService transportService = + new TransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, + Collections.emptySet()); + closeables.push(transportService); + + final List transportAddresses = SeedHostsResolver.resolveHostsLists( + executorService, + logger, + Arrays.asList(hostname), + 1, + transportService, + TimeValue.timeValueSeconds(30) + ); + + assertThat(transportAddresses, empty()); + verify(logger).warn("failed to resolve host [" + hostname + "]", unknownHostException); + } + + public void testResolveTimeout() { + final Logger logger = mock(Logger.class); + final NetworkService networkService = new NetworkService(Collections.emptyList()); + final CountDownLatch latch = new CountDownLatch(1); + final Transport transport = new MockNioTransport( + Settings.EMPTY, + Version.CURRENT, + threadPool, + networkService, + PageCacheRecycler.NON_RECYCLING_INSTANCE, + new NamedWriteableRegistry(Collections.emptyList()), + new NoneCircuitBreakerService()) { + + @Override + public BoundTransportAddress boundAddress() { + return new BoundTransportAddress( + new TransportAddress[]{new TransportAddress(InetAddress.getLoopbackAddress(), 9500)}, + new TransportAddress(InetAddress.getLoopbackAddress(), 9500) + ); + } + + @Override + public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws UnknownHostException { + if ("hostname1".equals(address)) { + return new TransportAddress[]{new TransportAddress(TransportAddress.META_ADDRESS, 9300)}; + } else if ("hostname2".equals(address)) { + try { + latch.await(); + return new TransportAddress[]{new TransportAddress(TransportAddress.META_ADDRESS, 9300)}; + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } else { + throw new UnknownHostException(address); + } + } + + }; + closeables.push(transport); + + final TransportService transportService = + new TransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, + Collections.emptySet()); + closeables.push(transportService); + final TimeValue resolveTimeout = TimeValue.timeValueSeconds(randomIntBetween(3, 5)); + try { + final List transportAddresses = SeedHostsResolver.resolveHostsLists( + executorService, + logger, + Arrays.asList("hostname1", "hostname2"), + 1, + transportService, + resolveTimeout); + + assertThat(transportAddresses, hasSize(1)); + verify(logger).trace( + "resolved host [{}] to {}", "hostname1", + new TransportAddress[]{new TransportAddress(TransportAddress.META_ADDRESS, 9300)}); + verify(logger).warn("timed out after [{}] resolving host [{}]", resolveTimeout, "hostname2"); + verifyNoMoreInteractions(logger); + } finally { + latch.countDown(); + } + } + + public void testInvalidHosts() { + final Logger logger = mock(Logger.class); + final Transport transport = new MockNioTransport( + Settings.EMPTY, + Version.CURRENT, + threadPool, + new NetworkService(Collections.emptyList()), + PageCacheRecycler.NON_RECYCLING_INSTANCE, + new NamedWriteableRegistry(Collections.emptyList()), + new NoneCircuitBreakerService()) { + @Override + public BoundTransportAddress boundAddress() { + return new BoundTransportAddress( + new TransportAddress[]{new TransportAddress(InetAddress.getLoopbackAddress(), 9300)}, + new TransportAddress(InetAddress.getLoopbackAddress(), 9300) + ); + } + }; + closeables.push(transport); + + final TransportService transportService = + new TransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, + Collections.emptySet()); + closeables.push(transportService); + final List transportAddresses = SeedHostsResolver.resolveHostsLists( + executorService, + logger, + Arrays.asList("127.0.0.1:9300:9300", "127.0.0.1:9301"), + 1, + transportService, + TimeValue.timeValueSeconds(30)); + assertThat(transportAddresses, hasSize(1)); // only one of the two is valid and will be used + assertThat(transportAddresses.get(0).getAddress(), equalTo("127.0.0.1")); + assertThat(transportAddresses.get(0).getPort(), equalTo(9301)); + verify(logger).warn(eq("failed to resolve host [127.0.0.1:9300:9300]"), Matchers.any(ExecutionException.class)); + } } diff --git a/server/src/test/java/org/elasticsearch/discovery/zen/UnicastZenPingTests.java b/server/src/test/java/org/elasticsearch/discovery/zen/UnicastZenPingTests.java deleted file mode 100644 index 859c0562924..00000000000 --- a/server/src/test/java/org/elasticsearch/discovery/zen/UnicastZenPingTests.java +++ /dev/null @@ -1,940 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.discovery.zen; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.apache.lucene.util.Constants; -import org.elasticsearch.Version; -import org.elasticsearch.cluster.ClusterName; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.block.ClusterBlocks; -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.node.DiscoveryNode.Role; -import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.common.io.stream.NamedWriteableRegistry; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.network.NetworkAddress; -import org.elasticsearch.common.network.NetworkService; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.transport.BoundTransportAddress; -import org.elasticsearch.common.transport.TransportAddress; -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.util.PageCacheRecycler; -import org.elasticsearch.common.util.concurrent.AbstractRunnable; -import org.elasticsearch.common.util.concurrent.ConcurrentCollections; -import org.elasticsearch.common.util.concurrent.EsExecutors; -import org.elasticsearch.core.internal.io.IOUtils; -import org.elasticsearch.discovery.SettingsBasedSeedHostsProvider; -import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; -import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.test.VersionUtils; -import org.elasticsearch.test.transport.MockTransportService; -import org.elasticsearch.threadpool.TestThreadPool; -import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.Transport; -import org.elasticsearch.transport.TransportConnectionListener; -import org.elasticsearch.transport.TransportException; -import org.elasticsearch.transport.TransportRequestOptions; -import org.elasticsearch.transport.TransportResponseHandler; -import org.elasticsearch.transport.TransportService; -import org.elasticsearch.transport.TransportSettings; -import org.elasticsearch.transport.nio.MockNioTransport; -import org.junit.After; -import org.junit.Before; -import org.mockito.Matchers; - -import java.io.Closeable; -import java.io.IOException; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.UnknownHostException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.EnumSet; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.Stack; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; -import java.util.function.BiFunction; -import java.util.stream.Collectors; -import java.util.stream.IntStream; - -import static java.util.Collections.emptyMap; -import static java.util.Collections.emptySet; -import static org.elasticsearch.discovery.SettingsBasedSeedHostsProvider.DISCOVERY_SEED_HOSTS_SETTING; -import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK; -import static org.hamcrest.Matchers.empty; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.greaterThan; -import static org.hamcrest.Matchers.hasSize; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoMoreInteractions; - -public class UnicastZenPingTests extends ESTestCase { - - private ThreadPool threadPool; - private ExecutorService executorService; - // close in reverse order as opened - private Stack closeables; - - @Before - public void setUp() throws Exception { - super.setUp(); - threadPool = new TestThreadPool(getClass().getName()); - final ThreadFactory threadFactory = EsExecutors.daemonThreadFactory("[" + getClass().getName() + "]"); - executorService = - EsExecutors.newScaling( - getClass().getName() + "/" + getTestName(), 0, 2, 60, TimeUnit.SECONDS, threadFactory, threadPool.getThreadContext()); - closeables = new Stack<>(); - } - - @After - public void tearDown() throws Exception { - try { - logger.info("shutting down..."); - // JDK stack is broken, it does not iterate in the expected order (http://bugs.java.com/bugdatabase/view_bug.do?bug_id=4475301) - final List reverse = new ArrayList<>(); - while (!closeables.isEmpty()) { - reverse.add(closeables.pop()); - } - IOUtils.close(reverse); - } finally { - terminate(executorService); - terminate(threadPool); - super.tearDown(); - } - } - - public void testSimplePings() throws IOException, InterruptedException, ExecutionException { - // use ephemeral ports - final Settings settings = Settings.builder().put("cluster.name", "test").put(TransportSettings.PORT.getKey(), 0).build(); - final Settings settingsMismatch = - Settings.builder().put(settings).put("cluster.name", "mismatch").put(TransportSettings.PORT.getKey(), 0).build(); - - NetworkService networkService = new NetworkService(Collections.emptyList()); - - final BiFunction supplier = (s, v) -> new MockNioTransport( - s, - v, - threadPool, - networkService, - PageCacheRecycler.NON_RECYCLING_INSTANCE, - new NamedWriteableRegistry(Collections.emptyList()), - new NoneCircuitBreakerService()); - - NetworkHandle handleA = startServices(settings, threadPool, "UZP_A", Version.CURRENT, supplier); - closeables.push(handleA.transportService); - NetworkHandle handleB = startServices(settings, threadPool, "UZP_B", Version.CURRENT, supplier); - closeables.push(handleB.transportService); - NetworkHandle handleC = startServices(settingsMismatch, threadPool, "UZP_C", Version.CURRENT, supplier); - closeables.push(handleC.transportService); - final Version versionD; - if (randomBoolean()) { - versionD = VersionUtils.randomVersionBetween(random(), Version.CURRENT.minimumCompatibilityVersion(), Version.CURRENT); - } else { - versionD = Version.CURRENT; - } - logger.info("UZP_D version set to [{}]", versionD); - NetworkHandle handleD = startServices(settingsMismatch, threadPool, "UZP_D", versionD, supplier); - closeables.push(handleD.transportService); - - final ClusterState state = ClusterState.builder(new ClusterName("test")).version(randomNonNegativeLong()).build(); - final ClusterState stateMismatch = ClusterState.builder(new ClusterName("mismatch")).version(randomNonNegativeLong()).build(); - - final Settings hostsSettings = Settings.builder() - .putList(DISCOVERY_SEED_HOSTS_SETTING.getKey(), - NetworkAddress.format(new InetSocketAddress(handleA.address.address().getAddress(), handleA.address.address().getPort())), - NetworkAddress.format(new InetSocketAddress(handleB.address.address().getAddress(), handleB.address.address().getPort())), - NetworkAddress.format(new InetSocketAddress(handleC.address.address().getAddress(), handleC.address.address().getPort())), - NetworkAddress.format(new InetSocketAddress(handleD.address.address().getAddress(), handleD.address.address().getPort()))) - .put("cluster.name", "test") - .build(); - - Settings hostsSettingsMismatch = Settings.builder().put(hostsSettings).put(settingsMismatch).build(); - ClusterState stateA = ClusterState.builder(state) - .blocks(ClusterBlocks.builder().addGlobalBlock(STATE_NOT_RECOVERED_BLOCK)) - .nodes(DiscoveryNodes.builder().add(handleA.node).localNodeId("UZP_A")) - .build(); - TestUnicastZenPing zenPingA = new TestUnicastZenPing(hostsSettings, threadPool, handleA, () -> stateA); - zenPingA.start(); - closeables.push(zenPingA); - - ClusterState stateB = ClusterState.builder(state) - .nodes(DiscoveryNodes.builder().add(handleB.node).localNodeId("UZP_B")) - .build(); - TestUnicastZenPing zenPingB = new TestUnicastZenPing(hostsSettings, threadPool, handleB, () -> stateB); - zenPingB.start(); - closeables.push(zenPingB); - - ClusterState stateC = ClusterState.builder(stateMismatch) - .nodes(DiscoveryNodes.builder().add(handleC.node).localNodeId("UZP_C")) - .build(); - TestUnicastZenPing zenPingC = new TestUnicastZenPing(hostsSettingsMismatch, threadPool, handleC, () -> stateC) { - @Override - protected Version getVersion() { - return versionD; - } - }; - zenPingC.start(); - closeables.push(zenPingC); - - ClusterState stateD = ClusterState.builder(stateMismatch) - .nodes(DiscoveryNodes.builder().add(handleD.node).localNodeId("UZP_D")) - .build(); - TestUnicastZenPing zenPingD = new TestUnicastZenPing(hostsSettingsMismatch, threadPool, handleD, () -> stateD); - zenPingD.start(); - closeables.push(zenPingD); - - logger.info("ping from UZP_A"); - Collection pingResponses = zenPingA.pingAndWait().toList(); - assertThat(pingResponses.size(), equalTo(1)); - ZenPing.PingResponse ping = pingResponses.iterator().next(); - assertThat(ping.node().getId(), equalTo("UZP_B")); - assertThat(ping.getClusterStateVersion(), equalTo(state.version())); - assertPings(handleA, handleB); - assertNoPings(handleA, handleC); // mismatch, shouldn't ping - assertNoPings(handleA, handleD); // mismatch, shouldn't ping - - // ping again, this time from B, - logger.info("ping from UZP_B"); - pingResponses = zenPingB.pingAndWait().toList(); - assertThat(pingResponses.size(), equalTo(1)); - ping = pingResponses.iterator().next(); - assertThat(ping.node().getId(), equalTo("UZP_A")); - assertThat(ping.getClusterStateVersion(), equalTo(ElectMasterService.MasterCandidate.UNRECOVERED_CLUSTER_VERSION)); - assertPings(handleB, handleA); - assertNoPings(handleB, handleC); // mismatch, shouldn't ping - assertNoPings(handleB, handleD); // mismatch, shouldn't ping - - logger.info("ping from UZP_C"); - pingResponses = zenPingC.pingAndWait().toList(); - assertThat(pingResponses.size(), equalTo(1)); - assertNoPings(handleC, handleA); - assertNoPings(handleC, handleB); - assertPings(handleC, handleD); - - logger.info("ping from UZP_D"); - pingResponses = zenPingD.pingAndWait().toList(); - assertThat(pingResponses.size(), equalTo(1)); - assertNoPings(handleD, handleA); - assertNoPings(handleD, handleB); - assertPings(handleD, handleC); - - zenPingC.close(); - handleD.counters.clear(); - logger.info("ping from UZP_D after closing UZP_C"); - pingResponses = zenPingD.pingAndWait().toList(); - // check that node does not respond to pings anymore after the ping service has been closed - assertThat(pingResponses.size(), equalTo(0)); - assertNoPings(handleD, handleA); - assertNoPings(handleD, handleB); - assertPings(handleD, handleC); - } - - public void testUnknownHostNotCached() throws ExecutionException, InterruptedException { - // use ephemeral ports - final Settings settings = Settings.builder().put("cluster.name", "test").put(TransportSettings.PORT.getKey(), 0).build(); - - final NetworkService networkService = new NetworkService(Collections.emptyList()); - - final Map addresses = new HashMap<>(); - final BiFunction supplier = (s, v) -> new MockNioTransport( - s, - v, - threadPool, - networkService, - PageCacheRecycler.NON_RECYCLING_INSTANCE, - new NamedWriteableRegistry(Collections.emptyList()), - new NoneCircuitBreakerService()) { - @Override - public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws UnknownHostException { - final TransportAddress[] transportAddresses = addresses.get(address); - if (transportAddresses == null) { - throw new UnknownHostException(address); - } else { - return transportAddresses; - } - } - }; - - final NetworkHandle handleA = startServices(settings, threadPool, "UZP_A", Version.CURRENT, supplier); - closeables.push(handleA.transportService); - final NetworkHandle handleB = startServices(settings, threadPool, "UZP_B", Version.CURRENT, supplier); - closeables.push(handleB.transportService); - final NetworkHandle handleC = startServices(settings, threadPool, "UZP_C", Version.CURRENT, supplier); - closeables.push(handleC.transportService); - - addresses.put( - "UZP_A", - new TransportAddress[]{ - new TransportAddress( - new InetSocketAddress(handleA.address.address().getAddress(), handleA.address.address().getPort()))}); - addresses.put( - "UZP_C", - new TransportAddress[]{ - new TransportAddress( - new InetSocketAddress(handleC.address.address().getAddress(), handleC.address.address().getPort()))}); - - final Settings hostsSettings = Settings.builder() - .putList(DISCOVERY_SEED_HOSTS_SETTING.getKey(), "UZP_A", "UZP_B", "UZP_C") - .put("cluster.name", "test") - .build(); - - final ClusterState state = ClusterState.builder(new ClusterName("test")).version(randomNonNegativeLong()).build(); - - ClusterState stateA = ClusterState.builder(state) - .blocks(ClusterBlocks.builder().addGlobalBlock(STATE_NOT_RECOVERED_BLOCK)) - .nodes(DiscoveryNodes.builder().add(handleA.node).localNodeId("UZP_A")) - .build(); - final TestUnicastZenPing zenPingA = new TestUnicastZenPing(hostsSettings, threadPool, handleA, () -> stateA); - zenPingA.start(); - closeables.push(zenPingA); - - ClusterState stateB = ClusterState.builder(state) - .nodes(DiscoveryNodes.builder().add(handleB.node).localNodeId("UZP_B")) - .build(); - TestUnicastZenPing zenPingB = new TestUnicastZenPing(hostsSettings, threadPool, handleB, () -> stateB); - zenPingB.start(); - closeables.push(zenPingB); - - ClusterState stateC = ClusterState.builder(state) - .nodes(DiscoveryNodes.builder().add(handleC.node).localNodeId("UZP_C")) - .build(); - TestUnicastZenPing zenPingC = new TestUnicastZenPing(hostsSettings, threadPool, handleC, () -> stateC); - zenPingC.start(); - closeables.push(zenPingC); - - // the presence of an unresolvable host should not prevent resolvable hosts from being pinged - { - final Collection pingResponses = zenPingA.pingAndWait().toList(); - assertThat(pingResponses.size(), equalTo(1)); - ZenPing.PingResponse ping = pingResponses.iterator().next(); - assertThat(ping.node().getId(), equalTo("UZP_C")); - assertThat(ping.getClusterStateVersion(), equalTo(state.version())); - assertNoPings(handleA, handleB); - assertPings(handleA, handleC); - assertNull(handleA.counters.get(handleB.address)); - } - - final HashMap moreThan = new HashMap<>(); - // we should see at least one ping to UZP_B, and one more ping than we have already seen to UZP_C - moreThan.put(handleB.address, 0); - moreThan.put(handleC.address, handleA.counters.get(handleC.address).intValue()); - - // now allow UZP_B to be resolvable - addresses.put( - "UZP_B", - new TransportAddress[]{ - new TransportAddress( - new InetSocketAddress(handleB.address.address().getAddress(), handleB.address.address().getPort()))}); - - // now we should see pings to UZP_B; this establishes that host resolutions are not cached - { - handleA.counters.clear(); - final Collection secondPingResponses = zenPingA.pingAndWait().toList(); - assertThat(secondPingResponses.size(), equalTo(2)); - final Set ids = new HashSet<>(secondPingResponses.stream().map(p -> p.node().getId()).collect(Collectors.toList())); - assertThat(ids, equalTo(new HashSet<>(Arrays.asList("UZP_B", "UZP_C")))); - assertPings(handleA, handleB); - assertPings(handleA, handleC); - } - } - - public void testPortLimit() throws InterruptedException { - final NetworkService networkService = new NetworkService(Collections.emptyList()); - final Transport transport = new MockNioTransport( - Settings.EMPTY, - Version.CURRENT, - threadPool, - networkService, - PageCacheRecycler.NON_RECYCLING_INSTANCE, - new NamedWriteableRegistry(Collections.emptyList()), - new NoneCircuitBreakerService()) { - - @Override - public BoundTransportAddress boundAddress() { - return new BoundTransportAddress( - new TransportAddress[]{new TransportAddress(InetAddress.getLoopbackAddress(), 9500)}, - new TransportAddress(InetAddress.getLoopbackAddress(), 9500) - ); - } - }; - closeables.push(transport); - final TransportService transportService = - new TransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, - Collections.emptySet()); - closeables.push(transportService); - final int limitPortCounts = randomIntBetween(1, 10); - final List transportAddresses = UnicastZenPing.resolveHostsLists( - executorService, - logger, - Collections.singletonList("127.0.0.1"), - limitPortCounts, - transportService, - TimeValue.timeValueSeconds(30)); - assertThat(transportAddresses, hasSize(limitPortCounts)); - final Set ports = new HashSet<>(); - for (final TransportAddress address : transportAddresses) { - assertTrue(address.address().getAddress().isLoopbackAddress()); - ports.add(address.getPort()); - } - assertThat(ports, equalTo(IntStream.range(9300, 9300 + limitPortCounts).mapToObj(m -> m).collect(Collectors.toSet()))); - } - - public void testRemovingLocalAddresses() throws InterruptedException { - final NetworkService networkService = new NetworkService(Collections.emptyList()); - final InetAddress loopbackAddress = InetAddress.getLoopbackAddress(); - final Transport transport = new MockNioTransport( - Settings.EMPTY, - Version.CURRENT, - threadPool, - networkService, - PageCacheRecycler.NON_RECYCLING_INSTANCE, - new NamedWriteableRegistry(Collections.emptyList()), - new NoneCircuitBreakerService()) { - - @Override - public BoundTransportAddress boundAddress() { - return new BoundTransportAddress( - new TransportAddress[]{ - new TransportAddress(loopbackAddress, 9300), - new TransportAddress(loopbackAddress, 9301) - }, - new TransportAddress(loopbackAddress, 9302) - ); - } - }; - closeables.push(transport); - final TransportService transportService = - new TransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, - Collections.emptySet()); - closeables.push(transportService); - final List transportAddresses = UnicastZenPing.resolveHostsLists( - executorService, - logger, - Collections.singletonList(NetworkAddress.format(loopbackAddress)), - 10, - transportService, - TimeValue.timeValueSeconds(30)); - assertThat(transportAddresses, hasSize(7)); - final Set ports = new HashSet<>(); - for (final TransportAddress address : transportAddresses) { - assertTrue(address.address().getAddress().isLoopbackAddress()); - ports.add(address.getPort()); - } - assertThat(ports, equalTo(IntStream.range(9303, 9310).mapToObj(m -> m).collect(Collectors.toSet()))); - } - - public void testUnknownHost() throws InterruptedException { - final Logger logger = mock(Logger.class); - final NetworkService networkService = new NetworkService(Collections.emptyList()); - final String hostname = randomAlphaOfLength(8); - final UnknownHostException unknownHostException = new UnknownHostException(hostname); - final Transport transport = new MockNioTransport( - Settings.EMPTY, - Version.CURRENT, - threadPool, - networkService, - PageCacheRecycler.NON_RECYCLING_INSTANCE, - new NamedWriteableRegistry(Collections.emptyList()), - new NoneCircuitBreakerService()) { - - @Override - public BoundTransportAddress boundAddress() { - return new BoundTransportAddress( - new TransportAddress[]{new TransportAddress(InetAddress.getLoopbackAddress(), 9300)}, - new TransportAddress(InetAddress.getLoopbackAddress(), 9300) - ); - } - - @Override - public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws UnknownHostException { - throw unknownHostException; - } - - }; - closeables.push(transport); - - final TransportService transportService = - new TransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, - Collections.emptySet()); - closeables.push(transportService); - - final List transportAddresses = UnicastZenPing.resolveHostsLists( - executorService, - logger, - Arrays.asList(hostname), - 1, - transportService, - TimeValue.timeValueSeconds(30) - ); - - assertThat(transportAddresses, empty()); - verify(logger).warn("failed to resolve host [" + hostname + "]", unknownHostException); - } - - public void testResolveTimeout() throws InterruptedException { - final Logger logger = mock(Logger.class); - final NetworkService networkService = new NetworkService(Collections.emptyList()); - final CountDownLatch latch = new CountDownLatch(1); - final Transport transport = new MockNioTransport( - Settings.EMPTY, - Version.CURRENT, - threadPool, - networkService, - PageCacheRecycler.NON_RECYCLING_INSTANCE, - new NamedWriteableRegistry(Collections.emptyList()), - new NoneCircuitBreakerService()) { - - @Override - public BoundTransportAddress boundAddress() { - return new BoundTransportAddress( - new TransportAddress[]{new TransportAddress(InetAddress.getLoopbackAddress(), 9500)}, - new TransportAddress(InetAddress.getLoopbackAddress(), 9500) - ); - } - - @Override - public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws UnknownHostException { - if ("hostname1".equals(address)) { - return new TransportAddress[]{new TransportAddress(TransportAddress.META_ADDRESS, 9300)}; - } else if ("hostname2".equals(address)) { - try { - latch.await(); - return new TransportAddress[]{new TransportAddress(TransportAddress.META_ADDRESS, 9300)}; - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } else { - throw new UnknownHostException(address); - } - } - - }; - closeables.push(transport); - - final TransportService transportService = - new TransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, - Collections.emptySet()); - closeables.push(transportService); - final TimeValue resolveTimeout = TimeValue.timeValueSeconds(randomIntBetween(3, 5)); - try { - final List transportAddresses = UnicastZenPing.resolveHostsLists( - executorService, - logger, - Arrays.asList("hostname1", "hostname2"), - 1, - transportService, - resolveTimeout); - - assertThat(transportAddresses, hasSize(1)); - verify(logger).trace( - "resolved host [{}] to {}", "hostname1", - new TransportAddress[]{new TransportAddress(TransportAddress.META_ADDRESS, 9300)}); - verify(logger).warn("timed out after [{}] resolving host [{}]", resolveTimeout, "hostname2"); - verifyNoMoreInteractions(logger); - } finally { - latch.countDown(); - } - } - - public void testResolveReuseExistingNodeConnections() throws ExecutionException, InterruptedException { - final Settings settings = Settings.builder().put("cluster.name", "test").put(TransportSettings.PORT.getKey(), 0).build(); - - NetworkService networkService = new NetworkService(Collections.emptyList()); - - final BiFunction supplier = (s, v) -> new MockNioTransport( - s, - Version.CURRENT, - threadPool, - networkService, - PageCacheRecycler.NON_RECYCLING_INSTANCE, - new NamedWriteableRegistry(Collections.emptyList()), - new NoneCircuitBreakerService()); - - NetworkHandle handleA = startServices(settings, threadPool, "UZP_A", Version.CURRENT, supplier, EnumSet.allOf(Role.class)); - closeables.push(handleA.transportService); - NetworkHandle handleB = startServices(settings, threadPool, "UZP_B", Version.CURRENT, supplier, EnumSet.allOf(Role.class)); - closeables.push(handleB.transportService); - - final boolean useHosts = randomBoolean(); - final Settings.Builder hostsSettingsBuilder = Settings.builder().put("cluster.name", "test"); - if (useHosts) { - hostsSettingsBuilder.putList(DISCOVERY_SEED_HOSTS_SETTING.getKey(), - NetworkAddress.format(new InetSocketAddress(handleB.address.address().getAddress(), handleB.address.address().getPort())) - ); - } else { - hostsSettingsBuilder.put(DISCOVERY_SEED_HOSTS_SETTING.getKey(), (String) null); - } - final Settings hostsSettings = hostsSettingsBuilder.build(); - - final ClusterState state = ClusterState.builder(new ClusterName("test")).version(randomNonNegativeLong()).build(); - - // connection to reuse - handleA.transportService.connectToNode(handleB.node); - - // install a listener to check that no new connections are made - handleA.transportService.addConnectionListener(new TransportConnectionListener() { - @Override - public void onConnectionOpened(Transport.Connection connection) { - fail("should not open any connections. got [" + connection.getNode() + "]"); - } - }); - - final ClusterState stateA = ClusterState.builder(state) - .blocks(ClusterBlocks.builder().addGlobalBlock(STATE_NOT_RECOVERED_BLOCK)) - .nodes(DiscoveryNodes.builder().add(handleA.node).add(handleB.node).localNodeId("UZP_A")) - .build(); - final TestUnicastZenPing zenPingA = new TestUnicastZenPing(hostsSettings, threadPool, handleA, () -> stateA); - zenPingA.start(); - closeables.push(zenPingA); - - final ClusterState stateB = ClusterState.builder(state) - .nodes(DiscoveryNodes.builder().add(handleB.node).localNodeId("UZP_B")) - .build(); - TestUnicastZenPing zenPingB = new TestUnicastZenPing(hostsSettings, threadPool, handleB, () -> stateB); - zenPingB.start(); - closeables.push(zenPingB); - - Collection pingResponses = zenPingA.pingAndWait().toList(); - assertThat(pingResponses.size(), equalTo(1)); - ZenPing.PingResponse ping = pingResponses.iterator().next(); - assertThat(ping.node().getId(), equalTo("UZP_B")); - assertThat(ping.getClusterStateVersion(), equalTo(state.version())); - - } - - public void testPingingTemporalPings() throws ExecutionException, InterruptedException { - final Settings settings = Settings.builder().put("cluster.name", "test").put(TransportSettings.PORT.getKey(), 0).build(); - - NetworkService networkService = new NetworkService(Collections.emptyList()); - - final BiFunction supplier = (s, v) -> new MockNioTransport( - s, - v, - threadPool, - networkService, - PageCacheRecycler.NON_RECYCLING_INSTANCE, - new NamedWriteableRegistry(Collections.emptyList()), - new NoneCircuitBreakerService()); - - NetworkHandle handleA = startServices(settings, threadPool, "UZP_A", Version.CURRENT, supplier, EnumSet.allOf(Role.class)); - closeables.push(handleA.transportService); - NetworkHandle handleB = startServices(settings, threadPool, "UZP_B", Version.CURRENT, supplier, EnumSet.allOf(Role.class)); - closeables.push(handleB.transportService); - - final Settings hostsSettings = Settings.builder() - .put("cluster.name", "test") - .put(DISCOVERY_SEED_HOSTS_SETTING.getKey(), (String) null) // use nodes for simplicity - .build(); - - final ClusterState state = ClusterState.builder(new ClusterName("test")).version(randomNonNegativeLong()).build(); - final ClusterState stateA = ClusterState.builder(state) - .blocks(ClusterBlocks.builder().addGlobalBlock(STATE_NOT_RECOVERED_BLOCK)) - .nodes(DiscoveryNodes.builder().add(handleA.node).add(handleB.node).localNodeId("UZP_A")).build(); - - final TestUnicastZenPing zenPingA = new TestUnicastZenPing(hostsSettings, threadPool, handleA, () -> stateA); - zenPingA.start(); - closeables.push(zenPingA); - - // Node B doesn't know about A! - final ClusterState stateB = ClusterState.builder(state).nodes( - DiscoveryNodes.builder().add(handleB.node).localNodeId("UZP_B")).build(); - TestUnicastZenPing zenPingB = new TestUnicastZenPing(hostsSettings, threadPool, handleB, () -> stateB); - zenPingB.start(); - closeables.push(zenPingB); - - { - logger.info("pinging from UZP_A so UZP_B will learn about it"); - Collection pingResponses = zenPingA.pingAndWait().toList(); - assertThat(pingResponses.size(), equalTo(1)); - ZenPing.PingResponse ping = pingResponses.iterator().next(); - assertThat(ping.node().getId(), equalTo("UZP_B")); - assertThat(ping.getClusterStateVersion(), equalTo(state.version())); - } - { - logger.info("pinging from UZP_B"); - Collection pingResponses = zenPingB.pingAndWait().toList(); - assertThat(pingResponses.size(), equalTo(1)); - ZenPing.PingResponse ping = pingResponses.iterator().next(); - assertThat(ping.node().getId(), equalTo("UZP_A")); - assertThat(ping.getClusterStateVersion(), equalTo(-1L)); // A has a block - } - } - - public void testInvalidHosts() throws InterruptedException { - final Logger logger = mock(Logger.class); - final Transport transport = new MockNioTransport( - Settings.EMPTY, - Version.CURRENT, - threadPool, - new NetworkService(Collections.emptyList()), - PageCacheRecycler.NON_RECYCLING_INSTANCE, - new NamedWriteableRegistry(Collections.emptyList()), - new NoneCircuitBreakerService()) { - @Override - public BoundTransportAddress boundAddress() { - return new BoundTransportAddress( - new TransportAddress[]{new TransportAddress(InetAddress.getLoopbackAddress(), 9300)}, - new TransportAddress(InetAddress.getLoopbackAddress(), 9300) - ); - } - }; - closeables.push(transport); - - final TransportService transportService = - new TransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, - Collections.emptySet()); - closeables.push(transportService); - final List transportAddresses = UnicastZenPing.resolveHostsLists( - executorService, - logger, - Arrays.asList("127.0.0.1:9300:9300", "127.0.0.1:9301"), - 1, - transportService, - TimeValue.timeValueSeconds(30)); - assertThat(transportAddresses, hasSize(1)); // only one of the two is valid and will be used - assertThat(transportAddresses.get(0).getAddress(), equalTo("127.0.0.1")); - assertThat(transportAddresses.get(0).getPort(), equalTo(9301)); - verify(logger).warn(eq("failed to resolve host [127.0.0.1:9300:9300]"), Matchers.any(ExecutionException.class)); - } - - private void assertNoPings(final NetworkHandle fromNode, final NetworkHandle toNode) { - final AtomicInteger counter = fromNode.counters.getOrDefault(toNode.address, new AtomicInteger()); - final String onNodeName = fromNode.node.getName(); - assertNotNull("handle for [" + onNodeName + "] has no 'expected' counter", counter); - final String forNodeName = toNode.node.getName(); - assertThat("node [" + onNodeName + "] ping count to [" + forNodeName + "] is unexpected", - counter.get(), equalTo(0)); - } - - private void assertPings(final NetworkHandle fromNode, final NetworkHandle toNode) { - final AtomicInteger counter = fromNode.counters.getOrDefault(toNode.address, new AtomicInteger()); - final String onNodeName = fromNode.node.getName(); - assertNotNull("handle for [" + onNodeName + "] has no 'expected' counter", counter); - final String forNodeName = toNode.node.getName(); - if (Constants.WINDOWS) { - // Some of the ping attempts seem to sporadically fail on Windows (see https://github.com/elastic/elasticsearch/issues/28685) - // Anyhow, the point of the test is not to assert the exact number of pings, but to check if pinging has taken place or not - assertThat("node [" + onNodeName + "] ping count to [" + forNodeName + "] is unexpected", - counter.get(), greaterThan(0)); - } else { - assertThat("node [" + onNodeName + "] ping count to [" + forNodeName + "] is unexpected", - counter.get(), equalTo(3)); - } - - } - - private NetworkHandle startServices( - final Settings settings, - final ThreadPool threadPool, - final String nodeId, - final Version version, - final BiFunction supplier) { - return startServices(settings, threadPool, nodeId, version, supplier, emptySet()); - - } - - private NetworkHandle startServices( - final Settings settings, - final ThreadPool threadPool, - final String nodeId, - final Version version, - final BiFunction supplier, - final Set nodeRoles) { - final Settings nodeSettings = Settings.builder().put(settings) - .put("node.name", nodeId) - .put(TransportSettings.TRACE_LOG_INCLUDE_SETTING.getKey(), "internal:discovery/zen/unicast") - .build(); - final Transport transport = supplier.apply(nodeSettings, version); - final MockTransportService transportService = - new MockTransportService(nodeSettings, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, boundAddress -> - new DiscoveryNode(nodeId, nodeId, boundAddress.publishAddress(), emptyMap(), nodeRoles, version), null, - Collections.emptySet()); - transportService.start(); - transportService.acceptIncomingRequests(); - final ConcurrentMap counters = ConcurrentCollections.newConcurrentMap(); - transportService.addTracer(new MockTransportService.Tracer() { - @Override - public void requestSent(DiscoveryNode node, long requestId, String action, TransportRequestOptions options) { - counters.computeIfAbsent(node.getAddress(), k -> new AtomicInteger()); - counters.get(node.getAddress()).incrementAndGet(); - } - }); - return new NetworkHandle(transport.boundAddress().publishAddress(), transportService, transportService.getLocalNode(), counters); - } - - private static class NetworkHandle { - - public final TransportAddress address; - public final TransportService transportService; - public final DiscoveryNode node; - public final ConcurrentMap counters; - - NetworkHandle( - final TransportAddress address, - final TransportService transportService, - final DiscoveryNode discoveryNode, - final ConcurrentMap counters) { - this.address = address; - this.transportService = transportService; - this.node = discoveryNode; - this.counters = counters; - } - } - - private static class TestUnicastZenPing extends UnicastZenPing { - - private static final Logger logger = LogManager.getLogger(TestUnicastZenPing.class); - - TestUnicastZenPing(Settings settings, ThreadPool threadPool, NetworkHandle networkHandle, - PingContextProvider contextProvider) { - super(Settings.builder().put("node.name", networkHandle.node.getName()).put(settings).build(), - threadPool, networkHandle.transportService, - new SettingsBasedSeedHostsProvider(settings, networkHandle.transportService), contextProvider); - } - - volatile CountDownLatch allTasksCompleted; - volatile AtomicInteger pendingTasks; - volatile CountDownLatch pingingRoundClosed; - - PingCollection pingAndWait() throws ExecutionException, InterruptedException { - allTasksCompleted = new CountDownLatch(1); - pingingRoundClosed = new CountDownLatch(1); - pendingTasks = new AtomicInteger(); - // mark the three sending rounds as ongoing - markTaskAsStarted("send pings"); - markTaskAsStarted("send pings"); - markTaskAsStarted("send pings"); - final AtomicReference response = new AtomicReference<>(); - ping(response::set, TimeValue.timeValueMillis(1), TimeValue.timeValueSeconds(30)); - pingingRoundClosed.await(); - final PingCollection result = response.get(); - assertNotNull("pinging didn't complete", result); - return result; - } - - @Override - protected void finishPingingRound(PingingRound pingingRound) { - // wait for all activity to finish before closing - try { - allTasksCompleted.await(); - } catch (InterruptedException e) { - // ok, finish anyway - } - super.finishPingingRound(pingingRound); - pingingRoundClosed.countDown(); - } - - @Override - protected void sendPings(TimeValue timeout, PingingRound pingingRound) { - super.sendPings(timeout, pingingRound); - markTaskAsCompleted("send pings"); - } - - @Override - protected void submitToExecutor(AbstractRunnable abstractRunnable) { - markTaskAsStarted("executor runnable"); - super.submitToExecutor(new AbstractRunnable() { - @Override - public void onRejection(Exception e) { - try { - super.onRejection(e); - } finally { - markTaskAsCompleted("executor runnable (rejected)"); - } - } - - @Override - public void onAfter() { - markTaskAsCompleted("executor runnable"); - } - - @Override - protected void doRun() throws Exception { - abstractRunnable.run(); - } - - @Override - public void onFailure(Exception e) { - // we shouldn't really end up here. - throw new AssertionError("unexpected error", e); - } - }); - } - - private void markTaskAsStarted(String task) { - logger.trace("task [{}] started. count [{}]", task, pendingTasks.incrementAndGet()); - } - - private void markTaskAsCompleted(String task) { - final int left = pendingTasks.decrementAndGet(); - logger.trace("task [{}] completed. count [{}]", task, left); - if (left == 0) { - allTasksCompleted.countDown(); - } - } - - @Override - protected TransportResponseHandler getPingResponseHandler(PingingRound pingingRound, DiscoveryNode node) { - markTaskAsStarted("ping [" + node + "]"); - TransportResponseHandler original = super.getPingResponseHandler(pingingRound, node); - return new TransportResponseHandler() { - @Override - public UnicastPingResponse read(StreamInput in) throws IOException { - return original.read(in); - } - - @Override - public void handleResponse(UnicastPingResponse response) { - original.handleResponse(response); - markTaskAsCompleted("ping [" + node + "]"); - } - - @Override - public void handleException(TransportException exp) { - original.handleException(exp); - markTaskAsCompleted("ping [" + node + "] (error)"); - } - - @Override - public String executor() { - return original.executor(); - } - }; - } - } - -} diff --git a/server/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryUnitTests.java b/server/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryUnitTests.java index 084ba62c479..49163f9aa1f 100644 --- a/server/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryUnitTests.java +++ b/server/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryUnitTests.java @@ -30,6 +30,7 @@ import org.elasticsearch.cluster.ClusterStateTaskExecutor; import org.elasticsearch.cluster.ESAllocationTestCase; import org.elasticsearch.cluster.coordination.JoinTaskExecutor; import org.elasticsearch.cluster.coordination.NodeRemovalClusterStateTaskExecutor; +import org.elasticsearch.cluster.coordination.ValidateJoinRequest; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -421,13 +422,13 @@ public class ZenDiscoveryUnitTests extends ESTestCase { .routingTable(RoutingTable.builder().add(indexRoutingTable).build()); if (incompatible) { IllegalStateException ex = expectThrows(IllegalStateException.class, () -> - request.messageReceived(new MembershipAction.ValidateJoinRequest(stateBuilder.build()), null, null)); + request.messageReceived(new ValidateJoinRequest(stateBuilder.build()), null, null)); assertEquals("index [test] version not supported: " + VersionUtils.getPreviousVersion(Version.CURRENT.minimumIndexCompatibilityVersion()) + " minimum compatible index version is: " + Version.CURRENT.minimumIndexCompatibilityVersion(), ex.getMessage()); } else { AtomicBoolean sendResponse = new AtomicBoolean(false); - request.messageReceived(new MembershipAction.ValidateJoinRequest(stateBuilder.build()), new TransportChannel() { + request.messageReceived(new ValidateJoinRequest(stateBuilder.build()), new TransportChannel() { @Override public String getProfileName() { diff --git a/server/src/test/java/org/elasticsearch/gateway/GatewayIndexStateIT.java b/server/src/test/java/org/elasticsearch/gateway/GatewayIndexStateIT.java index 06617a2bfdc..f431113183f 100644 --- a/server/src/test/java/org/elasticsearch/gateway/GatewayIndexStateIT.java +++ b/server/src/test/java/org/elasticsearch/gateway/GatewayIndexStateIT.java @@ -43,7 +43,6 @@ import org.elasticsearch.common.Priority; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentType; -import org.elasticsearch.discovery.zen.ElectMasterService; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.mapper.MapperParsingException; import org.elasticsearch.indices.IndexClosedException; @@ -492,7 +491,7 @@ public class GatewayIndexStateIT extends ESIntegTestCase { final MetaData metaData = state.getMetaData(); final MetaData brokenMeta = MetaData.builder(metaData).persistentSettings(Settings.builder() .put(metaData.persistentSettings()).put("this.is.unknown", true) - .put(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey(), "broken").build()).build(); + .put(MetaData.SETTING_CLUSTER_MAX_SHARDS_PER_NODE.getKey(), "broken").build()).build(); internalCluster().fullRestart(new RestartCallback(){ @Override public Settings onNodeStopped(String nodeName) throws Exception { @@ -506,7 +505,7 @@ public class GatewayIndexStateIT extends ESIntegTestCase { state = client().admin().cluster().prepareState().get().getState(); assertEquals("true", state.metaData().persistentSettings().get("archived.this.is.unknown")); assertEquals("broken", state.metaData().persistentSettings().get("archived." - + ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey())); + + MetaData.SETTING_CLUSTER_MAX_SHARDS_PER_NODE.getKey())); // delete these settings client().admin().cluster().prepareUpdateSettings().setPersistentSettings(Settings.builder().putNull("archived.*")).get(); @@ -514,7 +513,7 @@ public class GatewayIndexStateIT extends ESIntegTestCase { state = client().admin().cluster().prepareState().get().getState(); assertNull(state.metaData().persistentSettings().get("archived.this.is.unknown")); assertNull(state.metaData().persistentSettings().get("archived." - + ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey())); + + MetaData.SETTING_CLUSTER_MAX_SHARDS_PER_NODE.getKey())); assertHitCount(client().prepareSearch().setQuery(matchAllQuery()).get(), 1L); } } diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java index e72bbf3a728..d34a78ebb9e 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java @@ -81,7 +81,7 @@ import java.util.function.Supplier; * Matching requests to rules is based on the delegate address associated with the * discovery node of the request, namely by DiscoveryNode.getAddress(). * This address is usually the publish address of the node but can also be a different one - * (for example, @see org.elasticsearch.discovery.zen.ping.unicast.UnicastZenPing, which constructs + * (for example, @see org.elasticsearch.discovery.HandshakingTransportAddressConnector, which constructs * fake DiscoveryNode instances where the publish address is one of the bound addresses). */ public final class MockTransportService extends TransportService {