From 0e9186e1373b99e261d80317c836477b7cc31d66 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Wed, 21 Dec 2016 15:09:58 +0100 Subject: [PATCH] Simplify Unicast Zen Ping (#22277) The `UnicastZenPing` shows it's age and is the result of many small changes. The current state of affairs is confusing and is hard to reason about. This PR cleans it up (while following the same original intentions). Highlights of the changes are: 1) Clear 3 round flow - no interleaving of scheduling. 2) The previous implementation did a best effort attempt to wait for ongoing pings to be sent and completed. The pings were guaranteed to complete because each used the total ping duration as a timeout. This did make it hard to reason about the total ping duration and the flow of the code. All of this is removed now and ping should just complete within the given duration or not be counted (note that it was very handy for testing, but I move the needed sync logic to the test). 3) Because of (2) the pinging scheduling changed a bit, to give a chance for the last round to complete. We now ping at the beginning, 1/3 and 2/3 of the duration. 4) To offset for (3) a bit, incoming ping requests are now added to on going ping collections. 5) UnicastZenPing never establishes full blown connections (but does reuse them if there). Relates to #22120 6) Discovery host providers are only used once per pinging round. Closes #21739 7) Usage of the ability to open a connection without connecting to a node ( #22194 ) and shorter connection timeouts helps with connections piling up. Closes #19370 8) Beefed up testing and sped them up. 9) removed light profile from production code --- .../TransportClientNodesService.java | 21 +- .../discovery/zen/ElectMasterService.java | 3 +- .../discovery/zen/UnicastZenPing.java | 530 +++++++++--------- .../discovery/zen/ZenDiscovery.java | 24 +- .../elasticsearch/discovery/zen/ZenPing.java | 21 +- .../transport/ConnectionProfile.java | 28 +- .../elasticsearch/transport/Transport.java | 3 +- .../transport/TransportService.java | 27 - .../discovery/zen/UnicastZenPingTests.java | 446 ++++++++++++--- .../discovery/zen/ZenPingTests.java | 2 +- .../transport/TCPTransportTests.java | 4 +- .../TransportServiceHandshakeTests.java | 16 +- .../file/FileBasedUnicastHostsProvider.java | 6 +- .../FileBasedUnicastHostsProviderTests.java | 9 +- .../test/discovery/MockZenPing.java | 12 +- .../AbstractSimpleTransportTestCase.java | 8 +- .../transport/MockTcpTransport.java | 19 +- 17 files changed, 720 insertions(+), 459 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java b/core/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java index 350a35b6e49..b8600465b0e 100644 --- a/core/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java +++ b/core/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java @@ -101,6 +101,21 @@ final class TransportClientNodesService extends AbstractComponent implements Clo private final TransportClient.HostFailureListener hostFailureListener; + // TODO: migrate this to use low level connections and single type channels + /** {@link ConnectionProfile} to use when to connecting to the listed nodes and doing a liveness check */ + private static final ConnectionProfile LISTED_NODES_PROFILE; + + static { + ConnectionProfile.Builder builder = new ConnectionProfile.Builder(); + builder.addConnections(1, + TransportRequestOptions.Type.BULK, + TransportRequestOptions.Type.PING, + TransportRequestOptions.Type.RECOVERY, + TransportRequestOptions.Type.REG, + TransportRequestOptions.Type.STATE); + LISTED_NODES_PROFILE = builder.build(); + } + TransportClientNodesService(Settings settings, TransportService transportService, ThreadPool threadPool, TransportClient.HostFailureListener hostFailureListener) { super(settings); @@ -389,8 +404,8 @@ final class TransportClientNodesService extends AbstractComponent implements Clo if (!transportService.nodeConnected(listedNode)) { try { // its a listed node, light connect to it... - logger.trace("connecting to listed node (light) [{}]", listedNode); - transportService.connectToNode(listedNode, ConnectionProfile.LIGHT_PROFILE); + logger.trace("connecting to listed node [{}]", listedNode); + transportService.connectToNode(listedNode, LISTED_NODES_PROFILE); } catch (Exception e) { logger.info( (Supplier) @@ -470,7 +485,7 @@ final class TransportClientNodesService extends AbstractComponent implements Clo } else { // its a listed node, light connect to it... logger.trace("connecting to listed node (light) [{}]", listedNode); - transportService.connectToNode(listedNode, ConnectionProfile.LIGHT_PROFILE); + transportService.connectToNode(listedNode, LISTED_NODES_PROFILE); } } catch (Exception e) { logger.debug( diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/ElectMasterService.java b/core/src/main/java/org/elasticsearch/discovery/zen/ElectMasterService.java index 7116597bdaf..f0de15da9ae 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/ElectMasterService.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/ElectMasterService.java @@ -24,7 +24,6 @@ import org.apache.lucene.util.CollectionUtil; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.component.AbstractComponent; -import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; @@ -174,7 +173,7 @@ public class ElectMasterService extends AbstractComponent { * Returns the given nodes sorted by likelihood of being elected as master, most likely first. * Non-master nodes are not removed but are rather put in the end */ - public static List sortByMasterLikelihood(Iterable nodes) { + static List sortByMasterLikelihood(Iterable nodes) { ArrayList sortedNodes = CollectionUtils.iterableAsArrayList(nodes); CollectionUtil.introSort(sortedNodes, ElectMasterService::compareNodes); return sortedNodes; diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java b/core/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java index 4a6006c1a04..8018c47b4e1 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java @@ -23,13 +23,12 @@ import com.carrotsearch.hppc.cursors.ObjectCursor; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.util.Supplier; -import org.elasticsearch.ElasticsearchException; +import org.apache.lucene.store.AlreadyClosedException; +import org.apache.lucene.util.IOUtils; import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.common.Nullable; -import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -44,10 +43,14 @@ import org.elasticsearch.common.util.CollectionUtils; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.EsExecutors; -import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; +import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor; +import org.elasticsearch.common.util.concurrent.KeyedLock; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.ConnectTransportException; +import org.elasticsearch.transport.ConnectionProfile; +import org.elasticsearch.transport.NodeNotConnectedException; import org.elasticsearch.transport.RemoteTransportException; +import org.elasticsearch.transport.Transport.Connection; import org.elasticsearch.transport.TransportChannel; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportRequest; @@ -60,8 +63,8 @@ import org.elasticsearch.transport.TransportService; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; -import java.util.HashSet; +import java.util.Collections; +import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Locale; @@ -70,18 +73,17 @@ import java.util.Objects; import java.util.Queue; import java.util.Set; import java.util.concurrent.Callable; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; -import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.Collectors; +import java.util.stream.Stream; import static java.util.Collections.emptyList; import static java.util.Collections.emptyMap; @@ -116,22 +118,19 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing { private volatile PingContextProvider contextProvider; - private final AtomicInteger pingHandlerIdGenerator = new AtomicInteger(); + private final AtomicInteger pingingRoundIdGenerator = new AtomicInteger(); - // used to generate unique ids for nodes/address we temporarily connect to - private final AtomicInteger unicastNodeIdGenerator = new AtomicInteger(); - - // used as a node id prefix for nodes/address we temporarily connect to + // used as a node id prefix for configured unicast host nodes/address private static final String UNICAST_NODE_PREFIX = "#zen_unicast_"; - private final Map receivedResponses = newConcurrentMap(); + private final Map activePingingRounds = newConcurrentMap(); // a list of temporal responses a node will return for a request (holds responses from other nodes) private final Queue temporalResponses = ConcurrentCollections.newQueue(); private final UnicastHostsProvider hostsProvider; - private final ExecutorService unicastZenPingExecutorService; + protected final EsThreadPoolExecutor unicastZenPingExecutorService; private final TimeValue resolveTimeout; @@ -146,15 +145,14 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing { this.hostsProvider = unicastHostsProvider; this.concurrentConnects = DISCOVERY_ZEN_PING_UNICAST_CONCURRENT_CONNECTS_SETTING.get(settings); - final List hosts = DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING.get(settings); - if (hosts.isEmpty()) { + if (DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING.exists(settings)) { + configuredHosts = DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING.get(settings); + // we only limit to 1 addresses, makes no sense to ping 100 ports + limitPortCounts = LIMIT_FOREIGN_PORTS_COUNT; + } else { // if unicast hosts are not specified, fill with simple defaults on the local machine configuredHosts = transportService.getLocalAddresses(); limitPortCounts = LIMIT_LOCAL_PORTS_COUNT; - } else { - configuredHosts = hosts; - // we only limit to 1 addresses, makes no sense to ping 100 ports - limitPortCounts = LIMIT_FOREIGN_PORTS_COUNT; } resolveTimeout = DISCOVERY_ZEN_PING_UNICAST_HOSTS_RESOLVE_TIMEOUT.get(settings); logger.debug( @@ -164,7 +162,7 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing { resolveTimeout); transportService.registerRequestHandler(ACTION_NAME, UnicastPingRequest::new, ThreadPool.Names.SAME, - new UnicastPingRequestHandler()); + new UnicastPingRequestHandler()); final ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(settings, "[unicast_connect]"); unicastZenPingExecutorService = EsExecutors.newScaling( @@ -186,23 +184,23 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing { * @param hosts the hosts to resolve * @param limitPortCounts the number of ports to resolve (should be 1 for non-local transport) * @param transportService the transport service - * @param idGenerator the generator to supply unique ids for each discovery node + * @param nodeId_prefix a prefix to use for node ids * @param resolveTimeout the timeout before returning from hostname lookups * @return a list of discovery nodes with resolved transport addresses */ - public static List resolveDiscoveryNodes( + public static List resolveHostsLists( final ExecutorService executorService, final Logger logger, final List hosts, final int limitPortCounts, final TransportService transportService, - final Supplier idGenerator, + final String nodeId_prefix, final TimeValue resolveTimeout) throws InterruptedException { Objects.requireNonNull(executorService); Objects.requireNonNull(logger); Objects.requireNonNull(hosts); Objects.requireNonNull(transportService); - Objects.requireNonNull(idGenerator); + Objects.requireNonNull(nodeId_prefix); Objects.requireNonNull(resolveTimeout); if (resolveTimeout.nanos() < 0) { throw new IllegalArgumentException("resolve timeout must be non-negative but was [" + resolveTimeout + "]"); @@ -211,7 +209,7 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing { final List> callables = hosts .stream() - .map(hn -> (Callable)() -> transportService.addressesFromString(hn, limitPortCounts)) + .map(hn -> (Callable) () -> transportService.addressesFromString(hn, limitPortCounts)) .collect(Collectors.toList()); final List> futures = executorService.invokeAll(callables, resolveTimeout.nanos(), TimeUnit.NANOSECONDS); @@ -226,11 +224,11 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing { try { final TransportAddress[] addresses = future.get(); logger.trace("resolved host [{}] to {}", hostname, addresses); - for (final TransportAddress address : addresses) { + for (int addressId = 0; addressId < addresses.length; addressId++) { discoveryNodes.add( new DiscoveryNode( - idGenerator.get(), - address, + nodeId_prefix + hostname + "_" + addressId + "#", + addresses[addressId], emptyMap(), emptySet(), Version.CURRENT.minimumCompatibilityVersion())); @@ -249,8 +247,8 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing { @Override public void close() { - ThreadPool.terminate(unicastZenPingExecutorService, 0, TimeUnit.SECONDS); - Releasables.close(receivedResponses.values()); + ThreadPool.terminate(unicastZenPingExecutorService, 10, TimeUnit.SECONDS); + Releasables.close(activePingingRounds.values()); closed = true; } @@ -266,106 +264,106 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing { temporalResponses.clear(); } - // test only - Collection pingAndWait(TimeValue duration) { - final AtomicReference> response = new AtomicReference<>(); - final CountDownLatch latch = new CountDownLatch(1); - ping(pings -> { - response.set(pings); - latch.countDown(); - }, duration); - try { - latch.await(); - return response.get(); - } catch (InterruptedException e) { - return null; - } - } - /** - * Sends three rounds of pings notifying the specified {@link PingListener} when pinging is complete. Pings are sent after resolving + * Sends three rounds of pings notifying the specified {@link Consumer} when pinging is complete. Pings are sent after resolving * configured unicast hosts to their IP address (subject to DNS caching within the JVM). A batch of pings is sent, then another batch * of pings is sent at half the specified {@link TimeValue}, and then another batch of pings is sent at the specified {@link TimeValue}. * The pings that are sent carry a timeout of 1.25 times the specified {@link TimeValue}. When pinging each node, a connection and * handshake is performed, with a connection timeout of the specified {@link TimeValue}. * - * @param listener the callback when pinging is complete - * @param duration the timeout for various components of the pings + * @param resultsConsumer the callback when pinging is complete + * @param duration the timeout for various components of the pings */ @Override - public void ping(final PingListener listener, final TimeValue duration) { - final List resolvedDiscoveryNodes; + public void ping(final Consumer resultsConsumer, final TimeValue duration) { + ping(resultsConsumer, duration, duration); + } + + /** + * a variant of {@link #ping(Consumer, TimeValue)}, but allows separating the scheduling duration + * from the duration used for request level time outs. This is useful for testing + */ + protected void ping(final Consumer resultsConsumer, + final TimeValue scheduleDuration, + final TimeValue requestDuration) { + final List seedNodes; try { - resolvedDiscoveryNodes = resolveDiscoveryNodes( + seedNodes = resolveHostsLists( unicastZenPingExecutorService, logger, configuredHosts, limitPortCounts, transportService, - () -> UNICAST_NODE_PREFIX + unicastNodeIdGenerator.incrementAndGet() + "#", + UNICAST_NODE_PREFIX, resolveTimeout); } catch (InterruptedException e) { throw new RuntimeException(e); } - final SendPingsHandler sendPingsHandler = new SendPingsHandler(pingHandlerIdGenerator.incrementAndGet()); - try { - receivedResponses.put(sendPingsHandler.id(), sendPingsHandler); - try { - sendPings(duration, null, sendPingsHandler, resolvedDiscoveryNodes); - } catch (RejectedExecutionException e) { - logger.debug("Ping execution rejected", e); - // The RejectedExecutionException can come from the fact unicastZenPingExecutorService is at its max down in sendPings - // But don't bail here, we can retry later on after the send ping has been scheduled. + seedNodes.addAll(hostsProvider.buildDynamicNodes()); + final DiscoveryNodes nodes = contextProvider.nodes(); + // add all possible master nodes that were active in the last known cluster configuration + for (ObjectCursor masterNode : nodes.getMasterNodes().values()) { + seedNodes.add(masterNode.value); + } + + final ConnectionProfile connectionProfile = + ConnectionProfile.buildSingleChannelProfile(TransportRequestOptions.Type.REG, requestDuration, requestDuration); + final PingingRound pingingRound = new PingingRound(pingingRoundIdGenerator.incrementAndGet(), seedNodes, resultsConsumer, + nodes.getLocalNode(), connectionProfile); + activePingingRounds.put(pingingRound.id(), pingingRound); + final AbstractRunnable pingSender = new AbstractRunnable() { + @Override + public void onFailure(Exception e) { + if (e instanceof AlreadyClosedException == false) { + logger.warn("unexpected error while pinging", e); + } } - threadPool.schedule(TimeValue.timeValueMillis(duration.millis() / 2), ThreadPool.Names.GENERIC, new AbstractRunnable() { - @Override - protected void doRun() { - sendPings(duration, null, sendPingsHandler, resolvedDiscoveryNodes); - threadPool.schedule(TimeValue.timeValueMillis(duration.millis() / 2), ThreadPool.Names.GENERIC, new AbstractRunnable() { - @Override - protected void doRun() throws Exception { - sendPings(duration, TimeValue.timeValueMillis(duration.millis() / 2), sendPingsHandler, resolvedDiscoveryNodes); - sendPingsHandler.close(); - listener.onPing(sendPingsHandler.pingCollection().toList()); - for (DiscoveryNode node : sendPingsHandler.nodeToDisconnect) { - logger.trace("[{}] disconnecting from {}", sendPingsHandler.id(), node); - transportService.disconnectFromNode(node); - } - } + @Override + protected void doRun() throws Exception { + sendPings(requestDuration, pingingRound); + } + }; + threadPool.generic().execute(pingSender); + threadPool.schedule(TimeValue.timeValueMillis(scheduleDuration.millis() / 3), ThreadPool.Names.GENERIC, pingSender); + threadPool.schedule(TimeValue.timeValueMillis(scheduleDuration.millis() / 3 * 2), ThreadPool.Names.GENERIC, pingSender); + threadPool.schedule(scheduleDuration, ThreadPool.Names.GENERIC, new AbstractRunnable() { + @Override + protected void doRun() throws Exception { + finishPingingRound(pingingRound); + } - @Override - public void onFailure(Exception e) { - logger.debug("Ping execution failed", e); - sendPingsHandler.close(); - } - }); - } - - @Override - public void onFailure(Exception e) { - logger.debug("Ping execution failed", e); - sendPingsHandler.close(); - } - }); - } catch (EsRejectedExecutionException ex) { // TODO: remove this once ScheduledExecutor has support for AbstractRunnable - sendPingsHandler.close(); - // we are shutting down - } catch (Exception e) { - sendPingsHandler.close(); - throw new ElasticsearchException("Ping execution failed", e); - } + @Override + public void onFailure(Exception e) { + logger.warn("unexpected error while finishing pinging round", e); + } + }); } - class SendPingsHandler implements Releasable { + // for testing + protected void finishPingingRound(PingingRound pingingRound) { + pingingRound.close(); + } + + protected class PingingRound implements Releasable { private final int id; - private final Set nodeToDisconnect = ConcurrentCollections.newConcurrentSet(); + private final Map tempConnections = new HashMap<>(); + private final KeyedLock connectionLock = new KeyedLock<>(true); private final PingCollection pingCollection; + private final List seedNodes; + private final Consumer pingListener; + private final DiscoveryNode localNode; + private final ConnectionProfile connectionProfile; private AtomicBoolean closed = new AtomicBoolean(false); - SendPingsHandler(int id) { + PingingRound(int id, List seedNodes, Consumer resultsConsumer, DiscoveryNode localNode, + ConnectionProfile connectionProfile) { this.id = id; + this.seedNodes = Collections.unmodifiableList(new ArrayList<>(seedNodes)); + this.pingListener = resultsConsumer; + this.localNode = localNode; + this.connectionProfile = connectionProfile; this.pingCollection = new PingCollection(); } @@ -377,154 +375,170 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing { return this.closed.get(); } - public PingCollection pingCollection() { - return pingCollection; + public List getSeedNodes() { + ensureOpen(); + return seedNodes; + } + + public Connection getOrConnect(DiscoveryNode node) throws IOException { + Connection result; + try (Releasable ignore = connectionLock.acquire(node.getAddress())) { + result = tempConnections.get(node.getAddress()); + if (result == null) { + boolean success = false; + result = transportService.openConnection(node, connectionProfile); + try { + transportService.handshake(result, connectionProfile.getHandshakeTimeout().millis()); + synchronized (this) { + // acquire lock to prevent concurrent closing + Connection existing = tempConnections.put(node.getAddress(), result); + assert existing == null; + success = true; + } + } finally { + if (success == false) { + IOUtils.closeWhileHandlingException(result); + } + } + } + } + return result; + } + + private void ensureOpen() { + if (isClosed()) { + throw new AlreadyClosedException("pinging round [" + id + "] is finished"); + } + } + + public void addPingResponseToCollection(PingResponse pingResponse) { + if (localNode.equals(pingResponse.node()) == false) { + pingCollection.addPing(pingResponse); + } } @Override public void close() { - if (closed.compareAndSet(false, true)) { - receivedResponses.remove(id); + List toClose = null; + synchronized (this) { + if (closed.compareAndSet(false, true)) { + activePingingRounds.remove(id); + toClose = new ArrayList<>(tempConnections.values()); + tempConnections.clear(); + } } + if (toClose != null) { + // we actually closed + try { + pingListener.accept(pingCollection); + } finally { + IOUtils.closeWhileHandlingException(toClose); + } + } + } + + public ConnectionProfile getConnectionProfile() { + return connectionProfile; } } - void sendPings( - final TimeValue timeout, - @Nullable TimeValue waitTime, - final SendPingsHandler sendPingsHandler, - final List resolvedDiscoveryNodes) { + protected void sendPings(final TimeValue timeout, final PingingRound pingingRound) { final UnicastPingRequest pingRequest = new UnicastPingRequest(); - pingRequest.id = sendPingsHandler.id(); + pingRequest.id = pingingRound.id(); pingRequest.timeout = timeout; DiscoveryNodes discoNodes = contextProvider.nodes(); pingRequest.pingResponse = createPingResponse(discoNodes); - HashSet nodesToPingSet = new HashSet<>(); - for (PingResponse temporalResponse : temporalResponses) { - // Only send pings to nodes that have the same cluster name. - if (clusterName.equals(temporalResponse.clusterName())) { - nodesToPingSet.add(temporalResponse.node()); - } - } - nodesToPingSet.addAll(hostsProvider.buildDynamicNodes()); + Set nodesFromResponses = temporalResponses.stream().map(pingResponse -> { + assert clusterName.equals(pingResponse.clusterName()) : + "got a ping request from a different cluster. expected " + clusterName + " got " + pingResponse.clusterName(); + return pingResponse.node(); + }).collect(Collectors.toSet()); - // add all possible master nodes that were active in the last known cluster configuration - for (ObjectCursor masterNode : discoNodes.getMasterNodes().values()) { - nodesToPingSet.add(masterNode.value); - } + // dedup by address + final Map uniqueNodesByAddress = + Stream.concat(pingingRound.getSeedNodes().stream(), nodesFromResponses.stream()) + .collect(Collectors.toMap(DiscoveryNode::getAddress, Function.identity(), (n1, n2) -> n1)); - // sort the nodes by likelihood of being an active master - List sortedNodesToPing = ElectMasterService.sortByMasterLikelihood(nodesToPingSet); - // add the configured hosts first - final List nodesToPing = new ArrayList<>(resolvedDiscoveryNodes.size() + sortedNodesToPing.size()); - nodesToPing.addAll(resolvedDiscoveryNodes); - nodesToPing.addAll(sortedNodesToPing); - - final CountDownLatch latch = new CountDownLatch(nodesToPing.size()); - for (final DiscoveryNode node : nodesToPing) { - // make sure we are connected - final boolean nodeFoundByAddress; - DiscoveryNode nodeToSend = discoNodes.findByAddress(node.getAddress()); - if (nodeToSend != null) { - nodeFoundByAddress = true; - } else { - nodeToSend = node; - nodeFoundByAddress = false; - } - - if (!transportService.nodeConnected(nodeToSend)) { - if (sendPingsHandler.isClosed()) { - return; + // resolve what we can via the latest cluster state + final Set nodesToPing = uniqueNodesByAddress.values().stream() + .map(node -> { + DiscoveryNode foundNode = discoNodes.findByAddress(node.getAddress()); + if (foundNode == null) { + return node; + } else { + return foundNode; } - // if we find on the disco nodes a matching node by address, we are going to restore the connection - // anyhow down the line if its not connected... - // if we can't resolve the node, we don't know and we have to clean up after pinging. We do have - // to make sure we don't disconnect a true node which was temporarily removed from the DiscoveryNodes - // but will be added again during the pinging. We therefore create a new temporary node - if (!nodeFoundByAddress) { - if (!nodeToSend.getId().startsWith(UNICAST_NODE_PREFIX)) { - DiscoveryNode tempNode = new DiscoveryNode("", - UNICAST_NODE_PREFIX + unicastNodeIdGenerator.incrementAndGet() + "_" + nodeToSend.getId() + "#", - UUIDs.randomBase64UUID(), nodeToSend.getHostName(), nodeToSend.getHostAddress(), nodeToSend.getAddress(), - nodeToSend.getAttributes(), nodeToSend.getRoles(), nodeToSend.getVersion()); + }).collect(Collectors.toSet()); - logger.trace("replacing {} with temp node {}", nodeToSend, tempNode); - nodeToSend = tempNode; - } - sendPingsHandler.nodeToDisconnect.add(nodeToSend); - } - // fork the connection to another thread - final DiscoveryNode finalNodeToSend = nodeToSend; - unicastZenPingExecutorService.execute(new Runnable() { - @Override - public void run() { - if (sendPingsHandler.isClosed()) { - return; - } - boolean success = false; - try { - // connect to the node, see if we manage to do it, if not, bail - if (!nodeFoundByAddress) { - logger.trace("[{}] connecting (light) to {}", sendPingsHandler.id(), finalNodeToSend); - transportService.connectToNodeAndHandshake(finalNodeToSend, timeout.getMillis()); - } else { - logger.trace("[{}] connecting to {}", sendPingsHandler.id(), finalNodeToSend); - transportService.connectToNode(finalNodeToSend); - } - logger.trace("[{}] connected to {}", sendPingsHandler.id(), node); - if (receivedResponses.containsKey(sendPingsHandler.id())) { - // we are connected and still in progress, send the ping request - sendPingRequestToNode(sendPingsHandler.id(), timeout, pingRequest, latch, node, finalNodeToSend); - } else { - // connect took too long, just log it and bail - latch.countDown(); - logger.trace("[{}] connect to {} was too long outside of ping window, bailing", - sendPingsHandler.id(), node); - } - success = true; - } catch (ConnectTransportException e) { - // can't connect to the node - this is a more common path! - logger.trace( - (Supplier) () -> new ParameterizedMessage( - "[{}] failed to connect to {}", sendPingsHandler.id(), finalNodeToSend), e); - } catch (RemoteTransportException e) { - // something went wrong on the other side - logger.debug( - (Supplier) () -> new ParameterizedMessage( - "[{}] received a remote error as a response to ping {}", sendPingsHandler.id(), finalNodeToSend), e); - } catch (Exception e) { - logger.warn( - (Supplier) () -> new ParameterizedMessage( - "[{}] failed send ping to {}", sendPingsHandler.id(), finalNodeToSend), e); - } finally { - if (!success) { - latch.countDown(); - } - } - } - }); - } else { - sendPingRequestToNode(sendPingsHandler.id(), timeout, pingRequest, latch, node, nodeToSend); - } - } - if (waitTime != null) { - try { - latch.await(waitTime.millis(), TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - // ignore - } - } + nodesToPing.forEach(node -> sendPingRequestToNode(node, timeout, pingingRound, pingRequest)); } - private void sendPingRequestToNode(final int id, final TimeValue timeout, final UnicastPingRequest pingRequest, - final CountDownLatch latch, final DiscoveryNode node, final DiscoveryNode nodeToSend) { - logger.trace("[{}] sending to {}", id, nodeToSend); - transportService.sendRequest(nodeToSend, ACTION_NAME, pingRequest, TransportRequestOptions.builder() - .withTimeout((long) (timeout.millis() * 1.25)).build(), new TransportResponseHandler() { + private void sendPingRequestToNode(final DiscoveryNode node, TimeValue timeout, final PingingRound pingingRound, + final UnicastPingRequest pingRequest) { + submitToExecutor(new AbstractRunnable() { + @Override + protected void doRun() throws Exception { + Connection connection = null; + if (transportService.nodeConnected(node)) { + try { + // concurrency can still cause disconnects + connection = transportService.getConnection(node); + } catch (NodeNotConnectedException e) { + logger.trace("[{}] node [{}] just disconnected, will create a temp connection", pingingRound.id(), node); + } + } + + if (connection == null) { + connection = pingingRound.getOrConnect(node); + } + + logger.trace("[{}] sending to {}", pingingRound.id(), node); + transportService.sendRequest(connection, ACTION_NAME, pingRequest, + TransportRequestOptions.builder().withTimeout((long) (timeout.millis() * 1.25)).build(), + getPingResponseHandler(pingingRound, node)); + } + + @Override + public void onFailure(Exception e) { + if (e instanceof ConnectTransportException || e instanceof AlreadyClosedException) { + // can't connect to the node - this is more common path! + logger.trace( + (Supplier) () -> new ParameterizedMessage( + "[{}] failed to ping {}", pingingRound.id(), node), e); + } else if (e instanceof RemoteTransportException) { + // something went wrong on the other side + logger.debug( + (Supplier) () -> new ParameterizedMessage( + "[{}] received a remote error as a response to ping {}", pingingRound.id(), node), e); + } else { + logger.warn( + (Supplier) () -> new ParameterizedMessage( + "[{}] failed send ping to {}", pingingRound.id(), node), e); + } + } + + @Override + public void onRejection(Exception e) { + // The RejectedExecutionException can come from the fact unicastZenPingExecutorService is at its max down in sendPings + // But don't bail here, we can retry later on after the send ping has been scheduled. + logger.debug("Ping execution rejected", e); + } + }); + } + + // for testing + protected void submitToExecutor(AbstractRunnable abstractRunnable) { + unicastZenPingExecutorService.execute(abstractRunnable); + } + + // for testing + protected TransportResponseHandler getPingResponseHandler(final PingingRound pingingRound, + final DiscoveryNode node) { + return new TransportResponseHandler() { @Override public UnicastPingResponse newInstance() { @@ -538,50 +552,36 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing { @Override public void handleResponse(UnicastPingResponse response) { - logger.trace("[{}] received response from {}: {}", id, nodeToSend, Arrays.toString(response.pingResponses)); - try { - DiscoveryNodes discoveryNodes = contextProvider.nodes(); - for (PingResponse pingResponse : response.pingResponses) { - if (pingResponse.node().equals(discoveryNodes.getLocalNode())) { - // that's us, ignore - continue; - } - SendPingsHandler sendPingsHandler = receivedResponses.get(response.id); - if (sendPingsHandler == null) { - if (!closed) { - // Only log when we're not closing the node. Having no send ping handler is then expected - logger.warn("received ping response {} with no matching handler id [{}]", pingResponse, response.id); - } - } else { - sendPingsHandler.pingCollection().addPing(pingResponse); - } + logger.trace("[{}] received response from {}: {}", pingingRound.id(), node, Arrays.toString(response.pingResponses)); + if (pingingRound.isClosed()) { + if (logger.isTraceEnabled()) { + logger.trace("[{}] skipping received response from {}. already closed", pingingRound.id(), node); } - } finally { - latch.countDown(); + } else { + Stream.of(response.pingResponses).forEach(pingingRound::addPingResponseToCollection); } } @Override public void handleException(TransportException exp) { - latch.countDown(); - if (exp instanceof ConnectTransportException) { + if (exp instanceof ConnectTransportException || exp.getCause() instanceof ConnectTransportException) { // ok, not connected... - logger.trace((Supplier) () -> new ParameterizedMessage("failed to connect to {}", nodeToSend), exp); - } else { + logger.trace((Supplier) () -> new ParameterizedMessage("failed to connect to {}", node), exp); + } else if (closed == false) { logger.warn((Supplier) () -> new ParameterizedMessage("failed to send ping to [{}]", node), exp); } } - }); + }; } private UnicastPingResponse handlePingRequest(final UnicastPingRequest request) { + assert clusterName.equals(request.pingResponse.clusterName()) : + "got a ping request from a different cluster. expected " + clusterName + " got " + request.pingResponse.clusterName(); temporalResponses.add(request.pingResponse); - threadPool.schedule(TimeValue.timeValueMillis(request.timeout.millis() * 2), ThreadPool.Names.SAME, new Runnable() { - @Override - public void run() { - temporalResponses.remove(request.pingResponse); - } - }); + // add to any ongoing pinging + activePingingRounds.values().forEach(p -> p.addPingResponseToCollection(request.pingResponse)); + threadPool.schedule(TimeValue.timeValueMillis(request.timeout.millis() * 2), ThreadPool.Names.SAME, + () -> temporalResponses.remove(request.pingResponse)); List pingResponses = CollectionUtils.iterableAsArrayList(temporalResponses); pingResponses.add(createPingResponse(contextProvider.nodes())); @@ -601,11 +601,11 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing { channel.sendResponse(handlePingRequest(request)); } else { throw new IllegalStateException( - String.format( - Locale.ROOT, - "mismatched cluster names; request: [%s], local: [%s]", - request.pingResponse.clusterName().value(), - clusterName.value())); + String.format( + Locale.ROOT, + "mismatched cluster names; request: [%s], local: [%s]", + request.pingResponse.clusterName().value(), + clusterName.value())); } } diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java b/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java index 16ca16d4a90..b0adf1696ee 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java @@ -67,11 +67,11 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Set; -import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.stream.Collectors; @@ -1021,24 +1021,22 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover } private ZenPing.PingCollection pingAndWait(TimeValue timeout) { - final ZenPing.PingCollection response = new ZenPing.PingCollection(); - final CountDownLatch latch = new CountDownLatch(1); + final CompletableFuture response = new CompletableFuture<>(); try { - zenPing.ping(pings -> { - response.addPings(pings); - latch.countDown(); - }, timeout); + zenPing.ping(response::complete, timeout); } catch (Exception ex) { - logger.warn("Ping execution failed", ex); - latch.countDown(); + // logged later + response.completeExceptionally(ex); } try { - latch.await(); - return response; + return response.get(); } catch (InterruptedException e) { logger.trace("pingAndWait interrupted"); - return response; + return new ZenPing.PingCollection(); + } catch (ExecutionException e) { + logger.warn("Ping execution failed", e); + return new ZenPing.PingCollection(); } } diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/ZenPing.java b/core/src/main/java/org/elasticsearch/discovery/zen/ZenPing.java index 75ea701dc99..622c4649db2 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/ZenPing.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/ZenPing.java @@ -30,11 +30,11 @@ import org.elasticsearch.common.unit.TimeValue; import java.io.IOException; import java.util.ArrayList; -import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK; @@ -42,17 +42,7 @@ public interface ZenPing extends Releasable { void start(PingContextProvider contextProvider); - void ping(PingListener listener, TimeValue timeout); - - interface PingListener { - - /** - * called when pinging is done. - * - * @param pings ping result *must - */ - void onPing(Collection pings); - } + void ping(Consumer resultsConsumer, TimeValue timeout); class PingResponse implements Streamable { @@ -191,13 +181,6 @@ public interface ZenPing extends Releasable { return false; } - /** adds multiple pings if newer than previous pings from the same node */ - public synchronized void addPings(Iterable pings) { - for (PingResponse ping : pings) { - addPing(ping); - } - } - /** serialize current pings to a list. It is guaranteed that the list contains one ping response per node */ public synchronized List toList() { return new ArrayList<>(pings.values()); diff --git a/core/src/main/java/org/elasticsearch/transport/ConnectionProfile.java b/core/src/main/java/org/elasticsearch/transport/ConnectionProfile.java index 92421adea6a..8c14c6b5c9d 100644 --- a/core/src/main/java/org/elasticsearch/transport/ConnectionProfile.java +++ b/core/src/main/java/org/elasticsearch/transport/ConnectionProfile.java @@ -18,6 +18,7 @@ */ package org.elasticsearch.transport; +import org.elasticsearch.common.inject.internal.Nullable; import org.elasticsearch.common.unit.TimeValue; import java.util.ArrayList; @@ -35,16 +36,25 @@ import java.util.concurrent.atomic.AtomicInteger; public final class ConnectionProfile { /** - * A pre-built light connection profile that shares a single connection across all - * types. + * Builds a connection profile that is dedicated to a single channel type. Use this + * when opening single use connections */ - public static final ConnectionProfile LIGHT_PROFILE = new ConnectionProfile( - Collections.singletonList(new ConnectionTypeHandle(0, 1, EnumSet.of( - TransportRequestOptions.Type.BULK, - TransportRequestOptions.Type.PING, - TransportRequestOptions.Type.RECOVERY, - TransportRequestOptions.Type.REG, - TransportRequestOptions.Type.STATE))), 1, null, null); + public static ConnectionProfile buildSingleChannelProfile(TransportRequestOptions.Type channelType, + @Nullable TimeValue connectTimeout, + @Nullable TimeValue handshakeTimeout) { + Builder builder = new Builder(); + builder.addConnections(1, channelType); + final EnumSet otherTypes = EnumSet.allOf(TransportRequestOptions.Type.class); + otherTypes.remove(channelType); + builder.addConnections(0, otherTypes.stream().toArray(TransportRequestOptions.Type[]::new)); + if (connectTimeout != null) { + builder.setConnectTimeout(connectTimeout); + } + if (handshakeTimeout != null) { + builder.setHandshakeTimeout(handshakeTimeout); + } + return builder.build(); + } private final List handles; private final int numConnections; diff --git a/core/src/main/java/org/elasticsearch/transport/Transport.java b/core/src/main/java/org/elasticsearch/transport/Transport.java index e337aaf41b2..44c72e1f548 100644 --- a/core/src/main/java/org/elasticsearch/transport/Transport.java +++ b/core/src/main/java/org/elasticsearch/transport/Transport.java @@ -63,8 +63,7 @@ public interface Transport extends LifecycleComponent { boolean nodeConnected(DiscoveryNode node); /** - * Connects to a node with the given connection profile. Use {@link ConnectionProfile#LIGHT_PROFILE} when just connecting for ping - * and then disconnecting. If the node is already connected this method has no effect + * Connects to a node with the given connection profile. If the node is already connected this method has no effect */ void connectToNode(DiscoveryNode node, ConnectionProfile connectionProfile) throws ConnectTransportException; diff --git a/core/src/main/java/org/elasticsearch/transport/TransportService.java b/core/src/main/java/org/elasticsearch/transport/TransportService.java index 8884177ba63..e76210ff195 100644 --- a/core/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/core/src/main/java/org/elasticsearch/transport/TransportService.java @@ -62,7 +62,6 @@ import java.util.Objects; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; import java.util.function.Supplier; @@ -328,32 +327,6 @@ public class TransportService extends AbstractLifecycleComponent { } } - /** - * Lightly connect to the specified node, returning updated node - * information. The handshake will fail if the cluster name on the - * target node mismatches the local cluster name and - * {@code checkClusterName} is {@code true}. - * - * @param node the node to connect to - * @param handshakeTimeout handshake timeout - * @return the connected node - * @throws ConnectTransportException if the connection failed - * @throws IllegalStateException if the handshake failed - */ - public DiscoveryNode connectToNodeAndHandshake( - final DiscoveryNode node, - final long handshakeTimeout) throws IOException { - if (node.equals(localNode)) { - return localNode; - } - DiscoveryNode handshakeNode; - try (Transport.Connection connection = transport.openConnection(node, ConnectionProfile.LIGHT_PROFILE)) { - handshakeNode = handshake(connection, handshakeTimeout); - } - connectToNode(node, ConnectionProfile.LIGHT_PROFILE); - return handshakeNode; - } - /** * Executes a high-level handshake using the given connection * and returns the discovery node of the node the connection diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/UnicastZenPingTests.java b/core/src/test/java/org/elasticsearch/discovery/zen/UnicastZenPingTests.java index de8d1a562e8..84650f042cf 100644 --- a/core/src/test/java/org/elasticsearch/discovery/zen/UnicastZenPingTests.java +++ b/core/src/test/java/org/elasticsearch/discovery/zen/UnicastZenPingTests.java @@ -26,6 +26,7 @@ import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNode.Role; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.network.NetworkAddress; @@ -34,18 +35,22 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.VersionUtils; -import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.ConnectionProfile; import org.elasticsearch.transport.MockTcpTransport; import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportConnectionListener; +import org.elasticsearch.transport.TransportException; +import org.elasticsearch.transport.TransportRequestOptions; +import org.elasticsearch.transport.TransportResponseHandler; import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportSettings; import org.junit.After; @@ -60,12 +65,14 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.Stack; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; @@ -82,7 +89,6 @@ import static java.util.Collections.emptySet; import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.hasSize; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; @@ -124,8 +130,7 @@ public class UnicastZenPingTests extends ESTestCase { private static final UnicastHostsProvider EMPTY_HOSTS_PROVIDER = Collections::emptyList; - @TestLogging("org.elasticsearch.transport:TRACE,org.elasticsearch.discovery.zen.UnicastZenPing:TRACE") - public void testSimplePings() throws IOException, InterruptedException { + public void testSimplePings() throws IOException, InterruptedException, ExecutionException { // use ephemeral ports final Settings settings = Settings.builder().put("cluster.name", "test").put(TransportSettings.PORT.getKey(), 0).build(); final Settings settingsMismatch = @@ -140,7 +145,12 @@ public class UnicastZenPingTests extends ESTestCase { new NoneCircuitBreakerService(), new NamedWriteableRegistry(Collections.emptyList()), networkService, - v); + v) { + @Override + public void connectToNode(DiscoveryNode node, ConnectionProfile connectionProfile) { + throw new AssertionError("zen pings should never connect to node (got [" + node + "])"); + } + }; NetworkHandle handleA = startServices(settings, threadPool, "UZP_A", Version.CURRENT, supplier); closeables.push(handleA.transportService); @@ -148,25 +158,30 @@ public class UnicastZenPingTests extends ESTestCase { closeables.push(handleB.transportService); NetworkHandle handleC = startServices(settingsMismatch, threadPool, "UZP_C", Version.CURRENT, supplier); closeables.push(handleC.transportService); - // just fake that no versions are compatible with this node - Version previousVersion = VersionUtils.getPreviousVersion(Version.CURRENT.minimumCompatibilityVersion()); - Version versionD = VersionUtils.randomVersionBetween(random(), previousVersion.minimumCompatibilityVersion(), previousVersion); + final Version versionD; + if (randomBoolean()) { + versionD = VersionUtils.randomVersionBetween(random(), Version.CURRENT.minimumCompatibilityVersion(), Version.CURRENT); + } else { + versionD = Version.CURRENT; + } + logger.info("UZP_D version set to [{}]", versionD); NetworkHandle handleD = startServices(settingsMismatch, threadPool, "UZP_D", versionD, supplier); closeables.push(handleD.transportService); final ClusterState state = ClusterState.builder(new ClusterName("test")).version(randomPositiveLong()).build(); + final ClusterState stateMismatch = ClusterState.builder(new ClusterName("mismatch")).version(randomPositiveLong()).build(); Settings hostsSettings = Settings.builder() - .putArray("discovery.zen.ping.unicast.hosts", + .putArray("discovery.zen.ping.unicast.hosts", NetworkAddress.format(new InetSocketAddress(handleA.address.address().getAddress(), handleA.address.address().getPort())), NetworkAddress.format(new InetSocketAddress(handleB.address.address().getAddress(), handleB.address.address().getPort())), NetworkAddress.format(new InetSocketAddress(handleC.address.address().getAddress(), handleC.address.address().getPort())), NetworkAddress.format(new InetSocketAddress(handleD.address.address().getAddress(), handleD.address.address().getPort()))) - .put("cluster.name", "test") - .build(); + .put("cluster.name", "test") + .build(); Settings hostsSettingsMismatch = Settings.builder().put(hostsSettings).put(settingsMismatch).build(); - UnicastZenPing zenPingA = new UnicastZenPing(hostsSettings, threadPool, handleA.transportService, EMPTY_HOSTS_PROVIDER); + TestUnicastZenPing zenPingA = new TestUnicastZenPing(hostsSettings, threadPool, handleA, EMPTY_HOSTS_PROVIDER); zenPingA.start(new PingContextProvider() { @Override public DiscoveryNodes nodes() { @@ -180,7 +195,7 @@ public class UnicastZenPingTests extends ESTestCase { }); closeables.push(zenPingA); - UnicastZenPing zenPingB = new UnicastZenPing(hostsSettings, threadPool, handleB.transportService, EMPTY_HOSTS_PROVIDER); + TestUnicastZenPing zenPingB = new TestUnicastZenPing(hostsSettings, threadPool, handleB, EMPTY_HOSTS_PROVIDER); zenPingB.start(new PingContextProvider() { @Override public DiscoveryNodes nodes() { @@ -194,7 +209,8 @@ public class UnicastZenPingTests extends ESTestCase { }); closeables.push(zenPingB); - UnicastZenPing zenPingC = new UnicastZenPing(hostsSettingsMismatch, threadPool, handleC.transportService, EMPTY_HOSTS_PROVIDER) { + TestUnicastZenPing zenPingC = new TestUnicastZenPing(hostsSettingsMismatch, threadPool, handleC, + EMPTY_HOSTS_PROVIDER) { @Override protected Version getVersion() { return versionD; @@ -208,12 +224,13 @@ public class UnicastZenPingTests extends ESTestCase { @Override public ClusterState clusterState() { - return state; + return stateMismatch; } }); closeables.push(zenPingC); - UnicastZenPing zenPingD = new UnicastZenPing(hostsSettingsMismatch, threadPool, handleD.transportService, EMPTY_HOSTS_PROVIDER); + TestUnicastZenPing zenPingD = new TestUnicastZenPing(hostsSettingsMismatch, threadPool, handleD, + EMPTY_HOSTS_PROVIDER); zenPingD.start(new PingContextProvider() { @Override public DiscoveryNodes nodes() { @@ -222,40 +239,48 @@ public class UnicastZenPingTests extends ESTestCase { @Override public ClusterState clusterState() { - return state; + return stateMismatch; } }); closeables.push(zenPingD); logger.info("ping from UZP_A"); - Collection pingResponses = zenPingA.pingAndWait(TimeValue.timeValueMillis(500)); + Collection pingResponses = zenPingA.pingAndWait().toList(); assertThat(pingResponses.size(), equalTo(1)); ZenPing.PingResponse ping = pingResponses.iterator().next(); assertThat(ping.node().getId(), equalTo("UZP_B")); assertThat(ping.getClusterStateVersion(), equalTo(state.version())); - assertCountersMoreThan(handleA, handleB, handleC, handleD); + assertPingCount(handleA, handleB, 3); + assertPingCount(handleA, handleC, 0); // mismatch, shouldn't ping + assertPingCount(handleA, handleD, 0); // mismatch, shouldn't ping // ping again, this time from B, logger.info("ping from UZP_B"); - pingResponses = zenPingB.pingAndWait(TimeValue.timeValueMillis(500)); + pingResponses = zenPingB.pingAndWait().toList(); assertThat(pingResponses.size(), equalTo(1)); ping = pingResponses.iterator().next(); assertThat(ping.node().getId(), equalTo("UZP_A")); assertThat(ping.getClusterStateVersion(), equalTo(ElectMasterService.MasterCandidate.UNRECOVERED_CLUSTER_VERSION)); - assertCountersMoreThan(handleB, handleA, handleC, handleD); + assertPingCount(handleB, handleA, 3); + assertPingCount(handleB, handleC, 0); // mismatch, shouldn't ping + assertPingCount(handleB, handleD, 0); // mismatch, shouldn't ping logger.info("ping from UZP_C"); - pingResponses = zenPingC.pingAndWait(TimeValue.timeValueMillis(500)); - assertThat(pingResponses.size(), equalTo(0)); - assertCountersMoreThan(handleC, handleA, handleB, handleD); + pingResponses = zenPingC.pingAndWait().toList(); + assertThat(pingResponses.size(), equalTo(1)); + assertPingCount(handleC, handleA, 0); + assertPingCount(handleC, handleB, 0); + assertPingCount(handleC, handleD, 3); logger.info("ping from UZP_D"); - pingResponses = zenPingD.pingAndWait(TimeValue.timeValueMillis(500)); - assertThat(pingResponses.size(), equalTo(0)); - assertCountersMoreThan(handleD, handleA, handleB, handleC); + pingResponses = zenPingD.pingAndWait().toList(); + assertThat(pingResponses.size(), equalTo(1)); + assertPingCount(handleD, handleA, 0); + assertPingCount(handleD, handleB, 0); + assertPingCount(handleD, handleC, 3); } - public void testUnknownHostNotCached() { + public void testUnknownHostNotCached() throws ExecutionException, InterruptedException { // use ephemeral ports final Settings settings = Settings.builder().put("cluster.name", "test").put(TransportSettings.PORT.getKey(), 0).build(); @@ -306,7 +331,7 @@ public class UnicastZenPingTests extends ESTestCase { final ClusterState state = ClusterState.builder(new ClusterName("test")).version(randomPositiveLong()).build(); - final UnicastZenPing zenPingA = new UnicastZenPing(hostsSettings, threadPool, handleA.transportService, EMPTY_HOSTS_PROVIDER); + final TestUnicastZenPing zenPingA = new TestUnicastZenPing(hostsSettings, threadPool, handleA, EMPTY_HOSTS_PROVIDER); zenPingA.start(new PingContextProvider() { @Override public DiscoveryNodes nodes() { @@ -320,7 +345,7 @@ public class UnicastZenPingTests extends ESTestCase { }); closeables.push(zenPingA); - UnicastZenPing zenPingB = new UnicastZenPing(hostsSettings, threadPool, handleB.transportService, EMPTY_HOSTS_PROVIDER); + TestUnicastZenPing zenPingB = new TestUnicastZenPing(hostsSettings, threadPool, handleB, EMPTY_HOSTS_PROVIDER); zenPingB.start(new PingContextProvider() { @Override public DiscoveryNodes nodes() { @@ -334,7 +359,7 @@ public class UnicastZenPingTests extends ESTestCase { }); closeables.push(zenPingB); - UnicastZenPing zenPingC = new UnicastZenPing(hostsSettings, threadPool, handleC.transportService, EMPTY_HOSTS_PROVIDER); + TestUnicastZenPing zenPingC = new TestUnicastZenPing(hostsSettings, threadPool, handleC, EMPTY_HOSTS_PROVIDER); zenPingC.start(new PingContextProvider() { @Override public DiscoveryNodes nodes() { @@ -350,12 +375,13 @@ public class UnicastZenPingTests extends ESTestCase { // the presence of an unresolvable host should not prevent resolvable hosts from being pinged { - final Collection pingResponses = zenPingA.pingAndWait(TimeValue.timeValueMillis(500)); + final Collection pingResponses = zenPingA.pingAndWait().toList(); assertThat(pingResponses.size(), equalTo(1)); ZenPing.PingResponse ping = pingResponses.iterator().next(); assertThat(ping.node().getId(), equalTo("UZP_C")); assertThat(ping.getClusterStateVersion(), equalTo(state.version())); - assertCountersMoreThan(handleA, handleC); + assertPingCount(handleA, handleB, 0); + assertPingCount(handleA, handleC, 3); assertNull(handleA.counters.get(handleB.address)); } @@ -373,11 +399,13 @@ public class UnicastZenPingTests extends ESTestCase { // now we should see pings to UZP_B; this establishes that host resolutions are not cached { - final Collection secondPingResponses = zenPingA.pingAndWait(TimeValue.timeValueMillis(500)); + handleA.counters.clear(); + final Collection secondPingResponses = zenPingA.pingAndWait().toList(); assertThat(secondPingResponses.size(), equalTo(2)); final Set ids = new HashSet<>(secondPingResponses.stream().map(p -> p.node().getId()).collect(Collectors.toList())); assertThat(ids, equalTo(new HashSet<>(Arrays.asList("UZP_B", "UZP_C")))); - assertCountersMoreThan(moreThan, handleA, handleB, handleC); + assertPingCount(handleA, handleB, 3); + assertPingCount(handleA, handleC, 3); } } @@ -395,15 +423,14 @@ public class UnicastZenPingTests extends ESTestCase { final TransportService transportService = new TransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, null); closeables.push(transportService); - final AtomicInteger idGenerator = new AtomicInteger(); final int limitPortCounts = randomIntBetween(1, 10); - final List discoveryNodes = UnicastZenPing.resolveDiscoveryNodes( + final List discoveryNodes = TestUnicastZenPing.resolveHostsLists( executorService, logger, Collections.singletonList("127.0.0.1"), limitPortCounts, transportService, - () -> Integer.toString(idGenerator.incrementAndGet()), + "test_", TimeValue.timeValueSeconds(1)); assertThat(discoveryNodes, hasSize(limitPortCounts)); final Set ports = new HashSet<>(); @@ -439,15 +466,14 @@ public class UnicastZenPingTests extends ESTestCase { final TransportService transportService = new TransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, null); closeables.push(transportService); - final AtomicInteger idGenerator = new AtomicInteger(); - final List discoveryNodes = UnicastZenPing.resolveDiscoveryNodes( + final List discoveryNodes = TestUnicastZenPing.resolveHostsLists( executorService, logger, Arrays.asList(hostname), 1, transportService, - () -> Integer.toString(idGenerator.incrementAndGet()), + "test_", TimeValue.timeValueSeconds(1) ); @@ -490,16 +516,15 @@ public class UnicastZenPingTests extends ESTestCase { final TransportService transportService = new TransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, null); closeables.push(transportService); - final AtomicInteger idGenerator = new AtomicInteger(); final TimeValue resolveTimeout = TimeValue.timeValueSeconds(randomIntBetween(1, 3)); try { - final List discoveryNodes = UnicastZenPing.resolveDiscoveryNodes( + final List discoveryNodes = TestUnicastZenPing.resolveHostsLists( executorService, logger, Arrays.asList("hostname1", "hostname2"), 1, transportService, - () -> Integer.toString(idGenerator.incrementAndGet()), + "test+", resolveTimeout); assertThat(discoveryNodes, hasSize(1)); @@ -513,6 +538,156 @@ public class UnicastZenPingTests extends ESTestCase { } } + public void testResolveReuseExistingNodeConnections() throws ExecutionException, InterruptedException { + final Settings settings = Settings.builder().put("cluster.name", "test").put(TransportSettings.PORT.getKey(), 0).build(); + + NetworkService networkService = new NetworkService(settings, Collections.emptyList()); + + final BiFunction supplier = (s, v) -> new MockTcpTransport( + s, + threadPool, + BigArrays.NON_RECYCLING_INSTANCE, + new NoneCircuitBreakerService(), + new NamedWriteableRegistry(Collections.emptyList()), + networkService, + v); + + NetworkHandle handleA = startServices(settings, threadPool, "UZP_A", Version.CURRENT, supplier, EnumSet.allOf(Role.class)); + closeables.push(handleA.transportService); + NetworkHandle handleB = startServices(settings, threadPool, "UZP_B", Version.CURRENT, supplier, EnumSet.allOf(Role.class)); + closeables.push(handleB.transportService); + + final boolean useHosts = randomBoolean(); + final Settings.Builder hostsSettingsBuilder = Settings.builder().put("cluster.name", "test"); + if (useHosts) { + hostsSettingsBuilder.putArray("discovery.zen.ping.unicast.hosts", + NetworkAddress.format(new InetSocketAddress(handleB.address.address().getAddress(), handleB.address.address().getPort())) + ); + } else { + hostsSettingsBuilder.put("discovery.zen.ping.unicast.hosts", (String) null); + } + final Settings hostsSettings = hostsSettingsBuilder.build(); + final ClusterState state = ClusterState.builder(new ClusterName("test")).version(randomPositiveLong()).build(); + + // connection to reuse + handleA.transportService.connectToNode(handleB.node); + + // install a listener to check that no new connections are made + handleA.transportService.addConnectionListener(new TransportConnectionListener() { + @Override + public void onConnectionOpened(DiscoveryNode node) { + fail("should not open any connections. got [" + node + "]"); + } + }); + + final TestUnicastZenPing zenPingA = new TestUnicastZenPing(hostsSettings, threadPool, handleA, EMPTY_HOSTS_PROVIDER); + zenPingA.start(new PingContextProvider() { + @Override + public DiscoveryNodes nodes() { + return DiscoveryNodes.builder().add(handleA.node).add(handleB.node).localNodeId("UZP_A").build(); + } + + @Override + public ClusterState clusterState() { + return ClusterState.builder(state).blocks(ClusterBlocks.builder().addGlobalBlock(STATE_NOT_RECOVERED_BLOCK)).build(); + } + }); + closeables.push(zenPingA); + + TestUnicastZenPing zenPingB = new TestUnicastZenPing(hostsSettings, threadPool, handleB, EMPTY_HOSTS_PROVIDER); + zenPingB.start(new PingContextProvider() { + @Override + public DiscoveryNodes nodes() { + return DiscoveryNodes.builder().add(handleB.node).localNodeId("UZP_B").build(); + } + + @Override + public ClusterState clusterState() { + return state; + } + }); + closeables.push(zenPingB); + + Collection pingResponses = zenPingA.pingAndWait().toList(); + assertThat(pingResponses.size(), equalTo(1)); + ZenPing.PingResponse ping = pingResponses.iterator().next(); + assertThat(ping.node().getId(), equalTo("UZP_B")); + assertThat(ping.getClusterStateVersion(), equalTo(state.version())); + + } + + public void testPingingTemporalPings() throws ExecutionException, InterruptedException { + final Settings settings = Settings.builder().put("cluster.name", "test").put(TransportSettings.PORT.getKey(), 0).build(); + + NetworkService networkService = new NetworkService(settings, Collections.emptyList()); + + final BiFunction supplier = (s, v) -> new MockTcpTransport( + s, + threadPool, + BigArrays.NON_RECYCLING_INSTANCE, + new NoneCircuitBreakerService(), + new NamedWriteableRegistry(Collections.emptyList()), + networkService, + v); + + NetworkHandle handleA = startServices(settings, threadPool, "UZP_A", Version.CURRENT, supplier, EnumSet.allOf(Role.class)); + closeables.push(handleA.transportService); + NetworkHandle handleB = startServices(settings, threadPool, "UZP_B", Version.CURRENT, supplier, EnumSet.allOf(Role.class)); + closeables.push(handleB.transportService); + + final Settings hostsSettings = Settings.builder() + .put("cluster.name", "test") + .put("discovery.zen.ping.unicast.hosts", (String) null) // use nodes for simplicity + .build(); + final ClusterState state = ClusterState.builder(new ClusterName("test")).version(randomPositiveLong()).build(); + + final TestUnicastZenPing zenPingA = new TestUnicastZenPing(hostsSettings, threadPool, handleA, EMPTY_HOSTS_PROVIDER); + zenPingA.start(new PingContextProvider() { + @Override + public DiscoveryNodes nodes() { + return DiscoveryNodes.builder().add(handleA.node).add(handleB.node).localNodeId("UZP_A").build(); + } + + @Override + public ClusterState clusterState() { + return ClusterState.builder(state).blocks(ClusterBlocks.builder().addGlobalBlock(STATE_NOT_RECOVERED_BLOCK)).build(); + } + }); + closeables.push(zenPingA); + + // Node B doesn't know about A! + TestUnicastZenPing zenPingB = new TestUnicastZenPing(hostsSettings, threadPool, handleB, EMPTY_HOSTS_PROVIDER); + zenPingB.start(new PingContextProvider() { + @Override + public DiscoveryNodes nodes() { + return DiscoveryNodes.builder().add(handleB.node).localNodeId("UZP_B").build(); + } + + @Override + public ClusterState clusterState() { + return state; + } + }); + closeables.push(zenPingB); + + { + logger.info("pinging from UZP_A so UZP_B will learn about it"); + Collection pingResponses = zenPingA.pingAndWait().toList(); + assertThat(pingResponses.size(), equalTo(1)); + ZenPing.PingResponse ping = pingResponses.iterator().next(); + assertThat(ping.node().getId(), equalTo("UZP_B")); + assertThat(ping.getClusterStateVersion(), equalTo(state.version())); + } + { + logger.info("pinging from UZP_B"); + Collection pingResponses = zenPingB.pingAndWait().toList(); + assertThat(pingResponses.size(), equalTo(1)); + ZenPing.PingResponse ping = pingResponses.iterator().next(); + assertThat(ping.node().getId(), equalTo("UZP_A")); + assertThat(ping.getClusterStateVersion(), equalTo(-1L)); // A has a block + } + } + public void testInvalidHosts() throws InterruptedException { final Logger logger = mock(Logger.class); final NetworkService networkService = new NetworkService(Settings.EMPTY, Collections.emptyList()); @@ -529,14 +704,13 @@ public class UnicastZenPingTests extends ESTestCase { final TransportService transportService = new TransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, null); closeables.push(transportService); - final AtomicInteger idGenerator = new AtomicInteger(); - final List discoveryNodes = UnicastZenPing.resolveDiscoveryNodes( + final List discoveryNodes = TestUnicastZenPing.resolveHostsLists( executorService, logger, Arrays.asList("127.0.0.1:9300:9300", "127.0.0.1:9301"), 1, transportService, - () -> Integer.toString(idGenerator.incrementAndGet()), + "test_", TimeValue.timeValueSeconds(1)); assertThat(discoveryNodes, hasSize(1)); // only one of the two is valid and will be used assertThat(discoveryNodes.get(0).getAddress().getAddress(), equalTo("127.0.0.1")); @@ -544,24 +718,13 @@ public class UnicastZenPingTests extends ESTestCase { verify(logger).warn(eq("failed to resolve host [127.0.0.1:9300:9300]"), Matchers.any(ExecutionException.class)); } - // assert that we tried to ping each of the configured nodes at least once - private void assertCountersMoreThan(final NetworkHandle that, final NetworkHandle...handles) { - final HashMap moreThan = new HashMap<>(); - for (final NetworkHandle handle : handles) { - assert handle != that; - moreThan.put(handle.address, 0); - } - assertCountersMoreThan(moreThan, that, handles); - } - - private void assertCountersMoreThan( - final Map moreThan, - final NetworkHandle that, - final NetworkHandle... handles) { - for (final NetworkHandle handle : handles) { - assert handle != that; - assertThat(that.counters.get(handle.address).get(), greaterThan(moreThan.get(handle.address))); - } + private void assertPingCount(final NetworkHandle fromNode, final NetworkHandle toNode, int expectedCount) { + final AtomicInteger counter = fromNode.counters.getOrDefault(toNode.address, new AtomicInteger()); + final String onNodeName = fromNode.node.getName(); + assertNotNull("handle for [" + onNodeName + "] has no 'expected' counter", counter); + final String forNodeName = toNode.node.getName(); + assertThat("node [" + onNodeName + "] ping count to [" + forNodeName + "] is unexpected", + counter.get(), equalTo(expectedCount)); } private NetworkHandle startServices( @@ -570,31 +733,36 @@ public class UnicastZenPingTests extends ESTestCase { final String nodeId, final Version version, final BiFunction supplier) { - final Transport transport = supplier.apply(settings, version); - final TransportService transportService = - new MockTransportService(settings, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, null); + return startServices(settings, threadPool, nodeId, version, supplier, emptySet()); + + } + + private NetworkHandle startServices( + final Settings settings, + final ThreadPool threadPool, + final String nodeId, + final Version version, + final BiFunction supplier, + final Set nodeRoles) { + final Settings nodeSettings = Settings.builder().put(settings) + .put("node.name", nodeId) + .put(TransportService.TRACE_LOG_INCLUDE_SETTING.getKey(), "internal:discovery/zen/unicast") + .build(); + final Transport transport = supplier.apply(nodeSettings, version); + final MockTransportService transportService = + new MockTransportService(nodeSettings, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, null); transportService.start(); transportService.acceptIncomingRequests(); final ConcurrentMap counters = ConcurrentCollections.newConcurrentMap(); - transportService.addConnectionListener(new TransportConnectionListener() { - + transportService.addTracer(new MockTransportService.Tracer() { @Override - public void onNodeConnected(DiscoveryNode node) { - } - - @Override - public void onConnectionOpened(DiscoveryNode node) { + public void requestSent(DiscoveryNode node, long requestId, String action, TransportRequestOptions options) { counters.computeIfAbsent(node.getAddress(), k -> new AtomicInteger()); counters.get(node.getAddress()).incrementAndGet(); } - - @Override - public void onNodeDisconnected(DiscoveryNode node) { - } - }); final DiscoveryNode node = - new DiscoveryNode(nodeId, transportService.boundAddress().publishAddress(), emptyMap(), emptySet(), version); + new DiscoveryNode(nodeId, nodeId, transportService.boundAddress().publishAddress(), emptyMap(), nodeRoles, version); transportService.setLocalNode(node); return new NetworkHandle(transport.boundAddress().publishAddress(), transportService, node, counters); } @@ -616,7 +784,123 @@ public class UnicastZenPingTests extends ESTestCase { this.node = discoveryNode; this.counters = counters; } + } + private static class TestUnicastZenPing extends UnicastZenPing { + + public TestUnicastZenPing(Settings settings, ThreadPool threadPool, NetworkHandle networkHandle, + UnicastHostsProvider unicastHostsProvider) { + super(Settings.builder().put("node.name", networkHandle.node.getName()).put(settings).build(), + threadPool, networkHandle.transportService, unicastHostsProvider); + } + + volatile CountDownLatch allTasksCompleted; + volatile AtomicInteger pendingTasks; + + PingCollection pingAndWait() throws ExecutionException, InterruptedException { + allTasksCompleted = new CountDownLatch(1); + pendingTasks = new AtomicInteger(); + // make the three sending rounds to come as started + markTaskAsStarted("send pings"); + markTaskAsStarted("send pings"); + markTaskAsStarted("send pings"); + final CompletableFuture response = new CompletableFuture<>(); + try { + ping(response::complete, TimeValue.timeValueMillis(1), TimeValue.timeValueSeconds(1)); + } catch (Exception ex) { + response.completeExceptionally(ex); + } + return response.get(); + } + + @Override + protected void finishPingingRound(PingingRound pingingRound) { + // wait for all activity to finish before closing + try { + allTasksCompleted.await(); + } catch (InterruptedException e) { + // ok, finish anyway + } + super.finishPingingRound(pingingRound); + } + + @Override + protected void sendPings(TimeValue timeout, PingingRound pingingRound) { + super.sendPings(timeout, pingingRound); + markTaskAsCompleted("send pings"); + } + + @Override + protected void submitToExecutor(AbstractRunnable abstractRunnable) { + markTaskAsStarted("executor runnable"); + super.submitToExecutor(new AbstractRunnable() { + @Override + public void onRejection(Exception e) { + try { + super.onRejection(e); + } finally { + markTaskAsCompleted("executor runnable (rejected)"); + } + } + + @Override + public void onAfter() { + markTaskAsCompleted("executor runnable"); + } + + @Override + protected void doRun() throws Exception { + abstractRunnable.run(); + } + + @Override + public void onFailure(Exception e) { + // we shouldn't really end up here. + throw new AssertionError("unexpected error", e); + } + }); + } + + private void markTaskAsStarted(String task) { + logger.trace("task [{}] started. count [{}]", task, pendingTasks.incrementAndGet()); + } + + private void markTaskAsCompleted(String task) { + final int left = pendingTasks.decrementAndGet(); + logger.trace("task [{}] completed. count [{}]", task, left); + if (left == 0) { + allTasksCompleted.countDown(); + } + } + + @Override + protected TransportResponseHandler getPingResponseHandler(PingingRound pingingRound, DiscoveryNode node) { + markTaskAsStarted("ping [" + node + "]"); + TransportResponseHandler original = super.getPingResponseHandler(pingingRound, node); + return new TransportResponseHandler() { + @Override + public UnicastPingResponse newInstance() { + return original.newInstance(); + } + + @Override + public void handleResponse(UnicastPingResponse response) { + original.handleResponse(response); + markTaskAsCompleted("ping [" + node + "]"); + } + + @Override + public void handleException(TransportException exp) { + original.handleException(exp); + markTaskAsCompleted("ping [" + node + "] (error)"); + } + + @Override + public String executor() { + return original.executor(); + } + }; + } } } diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/ZenPingTests.java b/core/src/test/java/org/elasticsearch/discovery/zen/ZenPingTests.java index 9c70587d0e5..9fa680fc200 100644 --- a/core/src/test/java/org/elasticsearch/discovery/zen/ZenPingTests.java +++ b/core/src/test/java/org/elasticsearch/discovery/zen/ZenPingTests.java @@ -68,7 +68,7 @@ public class ZenPingTests extends ESTestCase { Collections.shuffle(pings, random()); ZenPing.PingCollection collection = new ZenPing.PingCollection(); - collection.addPings(pings); + pings.forEach(collection::addPing); List aggregate = collection.toList(); diff --git a/core/src/test/java/org/elasticsearch/transport/TCPTransportTests.java b/core/src/test/java/org/elasticsearch/transport/TCPTransportTests.java index aabc58887b2..0b46843cdb7 100644 --- a/core/src/test/java/org/elasticsearch/transport/TCPTransportTests.java +++ b/core/src/test/java/org/elasticsearch/transport/TCPTransportTests.java @@ -208,8 +208,8 @@ public class TCPTransportTests extends ESTestCase { @Override public NodeChannels getConnection(DiscoveryNode node) { - return new NodeChannels(node, new Object[ConnectionProfile.LIGHT_PROFILE.getNumConnections()], - ConnectionProfile.LIGHT_PROFILE); + return new NodeChannels(node, new Object[MockTcpTransport.LIGHT_PROFILE.getNumConnections()], + MockTcpTransport.LIGHT_PROFILE); } }; DiscoveryNode node = new DiscoveryNode("foo", buildNewFakeTransportAddress(), Version.CURRENT); diff --git a/core/src/test/java/org/elasticsearch/transport/TransportServiceHandshakeTests.java b/core/src/test/java/org/elasticsearch/transport/TransportServiceHandshakeTests.java index fd756f6790e..f7d05f22cd5 100644 --- a/core/src/test/java/org/elasticsearch/transport/TransportServiceHandshakeTests.java +++ b/core/src/test/java/org/elasticsearch/transport/TransportServiceHandshakeTests.java @@ -113,7 +113,7 @@ public class TransportServiceHandshakeTests extends ESTestCase { emptyMap(), emptySet(), Version.CURRENT.minimumCompatibilityVersion()); - try (Transport.Connection connection = handleA.transportService.openConnection(discoveryNode, ConnectionProfile.LIGHT_PROFILE)){ + try (Transport.Connection connection = handleA.transportService.openConnection(discoveryNode, MockTcpTransport.LIGHT_PROFILE)){ DiscoveryNode connectedNode = handleA.transportService.handshake(connection, timeout); assertNotNull(connectedNode); // the name and version should be updated @@ -121,16 +121,6 @@ public class TransportServiceHandshakeTests extends ESTestCase { assertEquals(connectedNode.getVersion(), handleB.discoveryNode.getVersion()); assertFalse(handleA.transportService.nodeConnected(discoveryNode)); } - - DiscoveryNode connectedNode = - handleA.transportService.connectToNodeAndHandshake(discoveryNode, timeout); - assertNotNull(connectedNode); - - // the name and version should be updated - assertEquals(connectedNode.getName(), "TS_B"); - assertEquals(connectedNode.getVersion(), handleB.discoveryNode.getVersion()); - assertTrue(handleA.transportService.nodeConnected(discoveryNode)); - } public void testMismatchedClusterName() { @@ -145,7 +135,7 @@ public class TransportServiceHandshakeTests extends ESTestCase { Version.CURRENT.minimumCompatibilityVersion()); IllegalStateException ex = expectThrows(IllegalStateException.class, () -> { try (Transport.Connection connection = handleA.transportService.openConnection(discoveryNode, - ConnectionProfile.LIGHT_PROFILE)) { + MockTcpTransport.LIGHT_PROFILE)) { handleA.transportService.handshake(connection, timeout); } }); @@ -166,7 +156,7 @@ public class TransportServiceHandshakeTests extends ESTestCase { Version.CURRENT.minimumCompatibilityVersion()); IllegalStateException ex = expectThrows(IllegalStateException.class, () -> { try (Transport.Connection connection = handleA.transportService.openConnection(discoveryNode, - ConnectionProfile.LIGHT_PROFILE)) { + MockTcpTransport.LIGHT_PROFILE)) { handleA.transportService.handshake(connection, timeout); } }); diff --git a/plugins/discovery-file/src/main/java/org/elasticsearch/discovery/file/FileBasedUnicastHostsProvider.java b/plugins/discovery-file/src/main/java/org/elasticsearch/discovery/file/FileBasedUnicastHostsProvider.java index 55e2029c8b7..196e98d6582 100644 --- a/plugins/discovery-file/src/main/java/org/elasticsearch/discovery/file/FileBasedUnicastHostsProvider.java +++ b/plugins/discovery-file/src/main/java/org/elasticsearch/discovery/file/FileBasedUnicastHostsProvider.java @@ -43,7 +43,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import static org.elasticsearch.discovery.zen.UnicastZenPing.DISCOVERY_ZEN_PING_UNICAST_HOSTS_RESOLVE_TIMEOUT; -import static org.elasticsearch.discovery.zen.UnicastZenPing.resolveDiscoveryNodes; +import static org.elasticsearch.discovery.zen.UnicastZenPing.resolveHostsLists; /** * An implementation of {@link UnicastHostsProvider} that reads hosts/ports @@ -97,13 +97,13 @@ class FileBasedUnicastHostsProvider extends AbstractComponent implements Unicast final List discoNodes = new ArrayList<>(); try { - discoNodes.addAll(resolveDiscoveryNodes( + discoNodes.addAll(resolveHostsLists( executorService, logger, hostsList, 1, transportService, - () -> UNICAST_HOST_PREFIX + nodeIdGenerator.incrementAndGet() + "#", + UNICAST_HOST_PREFIX, resolveTimeout)); } catch (InterruptedException e) { throw new RuntimeException(e); diff --git a/plugins/discovery-file/src/test/java/org/elasticsearch/discovery/file/FileBasedUnicastHostsProviderTests.java b/plugins/discovery-file/src/test/java/org/elasticsearch/discovery/file/FileBasedUnicastHostsProviderTests.java index 920792b6c7a..314e195754a 100644 --- a/plugins/discovery-file/src/test/java/org/elasticsearch/discovery/file/FileBasedUnicastHostsProviderTests.java +++ b/plugins/discovery-file/src/test/java/org/elasticsearch/discovery/file/FileBasedUnicastHostsProviderTests.java @@ -33,9 +33,7 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.MockTcpTransport; import org.elasticsearch.transport.TransportService; import org.junit.After; -import org.junit.AfterClass; import org.junit.Before; -import org.junit.BeforeClass; import java.io.BufferedWriter; import java.io.IOException; @@ -44,7 +42,6 @@ import java.nio.file.Path; import java.util.Arrays; import java.util.Collections; import java.util.List; -import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -99,13 +96,13 @@ public class FileBasedUnicastHostsProviderTests extends ESTestCase { assertEquals(hostEntries.size() - 1, nodes.size()); // minus 1 because we are ignoring the first line that's a comment assertEquals("192.168.0.1", nodes.get(0).getAddress().getAddress()); assertEquals(9300, nodes.get(0).getAddress().getPort()); - assertEquals(UNICAST_HOST_PREFIX + "1#", nodes.get(0).getId()); + assertEquals(UNICAST_HOST_PREFIX + "192.168.0.1_0#", nodes.get(0).getId()); assertEquals("192.168.0.2", nodes.get(1).getAddress().getAddress()); assertEquals(9305, nodes.get(1).getAddress().getPort()); - assertEquals(UNICAST_HOST_PREFIX + "2#", nodes.get(1).getId()); + assertEquals(UNICAST_HOST_PREFIX + "192.168.0.2:9305_0#", nodes.get(1).getId()); assertEquals("255.255.23.15", nodes.get(2).getAddress().getAddress()); assertEquals(9300, nodes.get(2).getAddress().getPort()); - assertEquals(UNICAST_HOST_PREFIX + "3#", nodes.get(2).getId()); + assertEquals(UNICAST_HOST_PREFIX + "255.255.23.15_0#", nodes.get(2).getId()); } public void testEmptyUnicastHostsFile() throws Exception { diff --git a/test/framework/src/main/java/org/elasticsearch/test/discovery/MockZenPing.java b/test/framework/src/main/java/org/elasticsearch/test/discovery/MockZenPing.java index fe16f034116..b03e157b01c 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/discovery/MockZenPing.java +++ b/test/framework/src/main/java/org/elasticsearch/test/discovery/MockZenPing.java @@ -28,10 +28,9 @@ import org.elasticsearch.discovery.zen.PingContextProvider; import org.elasticsearch.discovery.zen.ZenPing; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.Set; -import java.util.stream.Collectors; +import java.util.function.Consumer; /** * A {@link ZenPing} implementation which returns results based on an static in-memory map. This allows pinging @@ -62,7 +61,7 @@ public final class MockZenPing extends AbstractComponent implements ZenPing { } @Override - public void ping(PingListener listener, TimeValue timeout) { + public void ping(Consumer resultsConsumer, TimeValue timeout) { logger.info("pinging using mock zen ping"); synchronized (activeNodesPerCluster) { Set activeNodes = getActiveNodesForCurrentCluster(); @@ -76,11 +75,12 @@ public final class MockZenPing extends AbstractComponent implements ZenPing { activeNodes = getActiveNodesForCurrentCluster(); } lastDiscoveredPings = activeNodes; - List responseList = activeNodes.stream() + PingCollection pingCollection = new PingCollection(); + activeNodes.stream() .filter(p -> p != this) // remove this as pings are not expected to return the local node .map(MockZenPing::getPingResponse) - .collect(Collectors.toList()); - listener.onPing(responseList); + .forEach(pingCollection::addPing); + resultsConsumer.accept(pingCollection); } } diff --git a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java index 540420f35e7..8852b386ffd 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java @@ -29,9 +29,7 @@ import org.elasticsearch.Version; import org.elasticsearch.action.ActionListenerResponseHandler; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.common.io.stream.InputStreamStreamInput; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; -import org.elasticsearch.common.io.stream.OutputStreamStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.logging.Loggers; @@ -54,8 +52,6 @@ import org.junit.After; import org.junit.Before; import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; import java.io.UncheckedIOException; import java.net.InetAddress; import java.net.InetSocketAddress; @@ -1358,7 +1354,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { // all is well } - try (Transport.Connection connection = serviceB.openConnection(nodeA, ConnectionProfile.LIGHT_PROFILE)){ + try (Transport.Connection connection = serviceB.openConnection(nodeA, MockTcpTransport.LIGHT_PROFILE)){ serviceB.handshake(connection, 100); fail("exception should be thrown"); } catch (IllegalStateException e) { @@ -1416,7 +1412,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { // all is well } - try (Transport.Connection connection = serviceB.openConnection(nodeA, ConnectionProfile.LIGHT_PROFILE)){ + try (Transport.Connection connection = serviceB.openConnection(nodeA, MockTcpTransport.LIGHT_PROFILE)){ serviceB.handshake(connection, 100); fail("exception should be thrown"); } catch (IllegalStateException e) { diff --git a/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java b/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java index acda061a704..a2d5f10483f 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java @@ -66,6 +66,23 @@ import java.util.function.Consumer; */ public class MockTcpTransport extends TcpTransport { + /** + * A pre-built light connection profile that shares a single connection across all + * types. + */ + public static final ConnectionProfile LIGHT_PROFILE; + + static { + ConnectionProfile.Builder builder = new ConnectionProfile.Builder(); + builder.addConnections(1, + TransportRequestOptions.Type.BULK, + TransportRequestOptions.Type.PING, + TransportRequestOptions.Type.RECOVERY, + TransportRequestOptions.Type.REG, + TransportRequestOptions.Type.STATE); + LIGHT_PROFILE = builder.build(); + } + private final ExecutorService executor; private final Version mockVersion; @@ -159,7 +176,7 @@ public class MockTcpTransport extends TcpTransport @Override protected NodeChannels connectToChannels(DiscoveryNode node, ConnectionProfile profile) throws IOException { final MockChannel[] mockChannels = new MockChannel[1]; - final NodeChannels nodeChannels = new NodeChannels(node, mockChannels, ConnectionProfile.LIGHT_PROFILE); // we always use light here + final NodeChannels nodeChannels = new NodeChannels(node, mockChannels, LIGHT_PROFILE); // we always use light here boolean success = false; final Socket socket = new Socket(); try {