Simplify Unicast Zen Ping (#22277)
The `UnicastZenPing` shows it's age and is the result of many small changes. The current state of affairs is confusing and is hard to reason about. This PR cleans it up (while following the same original intentions). Highlights of the changes are: 1) Clear 3 round flow - no interleaving of scheduling. 2) The previous implementation did a best effort attempt to wait for ongoing pings to be sent and completed. The pings were guaranteed to complete because each used the total ping duration as a timeout. This did make it hard to reason about the total ping duration and the flow of the code. All of this is removed now and ping should just complete within the given duration or not be counted (note that it was very handy for testing, but I move the needed sync logic to the test). 3) Because of (2) the pinging scheduling changed a bit, to give a chance for the last round to complete. We now ping at the beginning, 1/3 and 2/3 of the duration. 4) To offset for (3) a bit, incoming ping requests are now added to on going ping collections. 5) UnicastZenPing never establishes full blown connections (but does reuse them if there). Relates to #22120 6) Discovery host providers are only used once per pinging round. Closes #21739 7) Usage of the ability to open a connection without connecting to a node ( #22194 ) and shorter connection timeouts helps with connections piling up. Closes #19370 8) Beefed up testing and sped them up. 9) removed light profile from production code
This commit is contained in:
parent
567c65b0d5
commit
0e9186e137
|
@ -101,6 +101,21 @@ final class TransportClientNodesService extends AbstractComponent implements Clo
|
|||
|
||||
private final TransportClient.HostFailureListener hostFailureListener;
|
||||
|
||||
// TODO: migrate this to use low level connections and single type channels
|
||||
/** {@link ConnectionProfile} to use when to connecting to the listed nodes and doing a liveness check */
|
||||
private static final ConnectionProfile LISTED_NODES_PROFILE;
|
||||
|
||||
static {
|
||||
ConnectionProfile.Builder builder = new ConnectionProfile.Builder();
|
||||
builder.addConnections(1,
|
||||
TransportRequestOptions.Type.BULK,
|
||||
TransportRequestOptions.Type.PING,
|
||||
TransportRequestOptions.Type.RECOVERY,
|
||||
TransportRequestOptions.Type.REG,
|
||||
TransportRequestOptions.Type.STATE);
|
||||
LISTED_NODES_PROFILE = builder.build();
|
||||
}
|
||||
|
||||
TransportClientNodesService(Settings settings, TransportService transportService,
|
||||
ThreadPool threadPool, TransportClient.HostFailureListener hostFailureListener) {
|
||||
super(settings);
|
||||
|
@ -389,8 +404,8 @@ final class TransportClientNodesService extends AbstractComponent implements Clo
|
|||
if (!transportService.nodeConnected(listedNode)) {
|
||||
try {
|
||||
// its a listed node, light connect to it...
|
||||
logger.trace("connecting to listed node (light) [{}]", listedNode);
|
||||
transportService.connectToNode(listedNode, ConnectionProfile.LIGHT_PROFILE);
|
||||
logger.trace("connecting to listed node [{}]", listedNode);
|
||||
transportService.connectToNode(listedNode, LISTED_NODES_PROFILE);
|
||||
} catch (Exception e) {
|
||||
logger.info(
|
||||
(Supplier<?>)
|
||||
|
@ -470,7 +485,7 @@ final class TransportClientNodesService extends AbstractComponent implements Clo
|
|||
} else {
|
||||
// its a listed node, light connect to it...
|
||||
logger.trace("connecting to listed node (light) [{}]", listedNode);
|
||||
transportService.connectToNode(listedNode, ConnectionProfile.LIGHT_PROFILE);
|
||||
transportService.connectToNode(listedNode, LISTED_NODES_PROFILE);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
logger.debug(
|
||||
|
|
|
@ -24,7 +24,6 @@ import org.apache.lucene.util.CollectionUtil;
|
|||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Setting.Property;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
@ -174,7 +173,7 @@ public class ElectMasterService extends AbstractComponent {
|
|||
* Returns the given nodes sorted by likelihood of being elected as master, most likely first.
|
||||
* Non-master nodes are not removed but are rather put in the end
|
||||
*/
|
||||
public static List<DiscoveryNode> sortByMasterLikelihood(Iterable<DiscoveryNode> nodes) {
|
||||
static List<DiscoveryNode> sortByMasterLikelihood(Iterable<DiscoveryNode> nodes) {
|
||||
ArrayList<DiscoveryNode> sortedNodes = CollectionUtils.iterableAsArrayList(nodes);
|
||||
CollectionUtil.introSort(sortedNodes, ElectMasterService::compareNodes);
|
||||
return sortedNodes;
|
||||
|
|
|
@ -23,13 +23,12 @@ import com.carrotsearch.hppc.cursors.ObjectCursor;
|
|||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||
import org.apache.logging.log4j.util.Supplier;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.apache.lucene.store.AlreadyClosedException;
|
||||
import org.apache.lucene.util.IOUtils;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.UUIDs;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
|
@ -44,10 +43,14 @@ import org.elasticsearch.common.util.CollectionUtils;
|
|||
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
||||
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
||||
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
||||
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
|
||||
import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor;
|
||||
import org.elasticsearch.common.util.concurrent.KeyedLock;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.ConnectTransportException;
|
||||
import org.elasticsearch.transport.ConnectionProfile;
|
||||
import org.elasticsearch.transport.NodeNotConnectedException;
|
||||
import org.elasticsearch.transport.RemoteTransportException;
|
||||
import org.elasticsearch.transport.Transport.Connection;
|
||||
import org.elasticsearch.transport.TransportChannel;
|
||||
import org.elasticsearch.transport.TransportException;
|
||||
import org.elasticsearch.transport.TransportRequest;
|
||||
|
@ -60,8 +63,8 @@ import org.elasticsearch.transport.TransportService;
|
|||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.HashSet;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
|
@ -70,18 +73,17 @@ import java.util.Objects;
|
|||
import java.util.Queue;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.RejectedExecutionException;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static java.util.Collections.emptyList;
|
||||
import static java.util.Collections.emptyMap;
|
||||
|
@ -116,22 +118,19 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing {
|
|||
|
||||
private volatile PingContextProvider contextProvider;
|
||||
|
||||
private final AtomicInteger pingHandlerIdGenerator = new AtomicInteger();
|
||||
private final AtomicInteger pingingRoundIdGenerator = new AtomicInteger();
|
||||
|
||||
// used to generate unique ids for nodes/address we temporarily connect to
|
||||
private final AtomicInteger unicastNodeIdGenerator = new AtomicInteger();
|
||||
|
||||
// used as a node id prefix for nodes/address we temporarily connect to
|
||||
// used as a node id prefix for configured unicast host nodes/address
|
||||
private static final String UNICAST_NODE_PREFIX = "#zen_unicast_";
|
||||
|
||||
private final Map<Integer, SendPingsHandler> receivedResponses = newConcurrentMap();
|
||||
private final Map<Integer, PingingRound> activePingingRounds = newConcurrentMap();
|
||||
|
||||
// a list of temporal responses a node will return for a request (holds responses from other nodes)
|
||||
private final Queue<PingResponse> temporalResponses = ConcurrentCollections.newQueue();
|
||||
|
||||
private final UnicastHostsProvider hostsProvider;
|
||||
|
||||
private final ExecutorService unicastZenPingExecutorService;
|
||||
protected final EsThreadPoolExecutor unicastZenPingExecutorService;
|
||||
|
||||
private final TimeValue resolveTimeout;
|
||||
|
||||
|
@ -146,15 +145,14 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing {
|
|||
this.hostsProvider = unicastHostsProvider;
|
||||
|
||||
this.concurrentConnects = DISCOVERY_ZEN_PING_UNICAST_CONCURRENT_CONNECTS_SETTING.get(settings);
|
||||
final List<String> hosts = DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING.get(settings);
|
||||
if (hosts.isEmpty()) {
|
||||
if (DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING.exists(settings)) {
|
||||
configuredHosts = DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING.get(settings);
|
||||
// we only limit to 1 addresses, makes no sense to ping 100 ports
|
||||
limitPortCounts = LIMIT_FOREIGN_PORTS_COUNT;
|
||||
} else {
|
||||
// if unicast hosts are not specified, fill with simple defaults on the local machine
|
||||
configuredHosts = transportService.getLocalAddresses();
|
||||
limitPortCounts = LIMIT_LOCAL_PORTS_COUNT;
|
||||
} else {
|
||||
configuredHosts = hosts;
|
||||
// we only limit to 1 addresses, makes no sense to ping 100 ports
|
||||
limitPortCounts = LIMIT_FOREIGN_PORTS_COUNT;
|
||||
}
|
||||
resolveTimeout = DISCOVERY_ZEN_PING_UNICAST_HOSTS_RESOLVE_TIMEOUT.get(settings);
|
||||
logger.debug(
|
||||
|
@ -164,7 +162,7 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing {
|
|||
resolveTimeout);
|
||||
|
||||
transportService.registerRequestHandler(ACTION_NAME, UnicastPingRequest::new, ThreadPool.Names.SAME,
|
||||
new UnicastPingRequestHandler());
|
||||
new UnicastPingRequestHandler());
|
||||
|
||||
final ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(settings, "[unicast_connect]");
|
||||
unicastZenPingExecutorService = EsExecutors.newScaling(
|
||||
|
@ -186,23 +184,23 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing {
|
|||
* @param hosts the hosts to resolve
|
||||
* @param limitPortCounts the number of ports to resolve (should be 1 for non-local transport)
|
||||
* @param transportService the transport service
|
||||
* @param idGenerator the generator to supply unique ids for each discovery node
|
||||
* @param nodeId_prefix a prefix to use for node ids
|
||||
* @param resolveTimeout the timeout before returning from hostname lookups
|
||||
* @return a list of discovery nodes with resolved transport addresses
|
||||
*/
|
||||
public static List<DiscoveryNode> resolveDiscoveryNodes(
|
||||
public static List<DiscoveryNode> resolveHostsLists(
|
||||
final ExecutorService executorService,
|
||||
final Logger logger,
|
||||
final List<String> hosts,
|
||||
final int limitPortCounts,
|
||||
final TransportService transportService,
|
||||
final Supplier<String> idGenerator,
|
||||
final String nodeId_prefix,
|
||||
final TimeValue resolveTimeout) throws InterruptedException {
|
||||
Objects.requireNonNull(executorService);
|
||||
Objects.requireNonNull(logger);
|
||||
Objects.requireNonNull(hosts);
|
||||
Objects.requireNonNull(transportService);
|
||||
Objects.requireNonNull(idGenerator);
|
||||
Objects.requireNonNull(nodeId_prefix);
|
||||
Objects.requireNonNull(resolveTimeout);
|
||||
if (resolveTimeout.nanos() < 0) {
|
||||
throw new IllegalArgumentException("resolve timeout must be non-negative but was [" + resolveTimeout + "]");
|
||||
|
@ -211,7 +209,7 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing {
|
|||
final List<Callable<TransportAddress[]>> callables =
|
||||
hosts
|
||||
.stream()
|
||||
.map(hn -> (Callable<TransportAddress[]>)() -> transportService.addressesFromString(hn, limitPortCounts))
|
||||
.map(hn -> (Callable<TransportAddress[]>) () -> transportService.addressesFromString(hn, limitPortCounts))
|
||||
.collect(Collectors.toList());
|
||||
final List<Future<TransportAddress[]>> futures =
|
||||
executorService.invokeAll(callables, resolveTimeout.nanos(), TimeUnit.NANOSECONDS);
|
||||
|
@ -226,11 +224,11 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing {
|
|||
try {
|
||||
final TransportAddress[] addresses = future.get();
|
||||
logger.trace("resolved host [{}] to {}", hostname, addresses);
|
||||
for (final TransportAddress address : addresses) {
|
||||
for (int addressId = 0; addressId < addresses.length; addressId++) {
|
||||
discoveryNodes.add(
|
||||
new DiscoveryNode(
|
||||
idGenerator.get(),
|
||||
address,
|
||||
nodeId_prefix + hostname + "_" + addressId + "#",
|
||||
addresses[addressId],
|
||||
emptyMap(),
|
||||
emptySet(),
|
||||
Version.CURRENT.minimumCompatibilityVersion()));
|
||||
|
@ -249,8 +247,8 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing {
|
|||
|
||||
@Override
|
||||
public void close() {
|
||||
ThreadPool.terminate(unicastZenPingExecutorService, 0, TimeUnit.SECONDS);
|
||||
Releasables.close(receivedResponses.values());
|
||||
ThreadPool.terminate(unicastZenPingExecutorService, 10, TimeUnit.SECONDS);
|
||||
Releasables.close(activePingingRounds.values());
|
||||
closed = true;
|
||||
}
|
||||
|
||||
|
@ -266,106 +264,106 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing {
|
|||
temporalResponses.clear();
|
||||
}
|
||||
|
||||
// test only
|
||||
Collection<PingResponse> pingAndWait(TimeValue duration) {
|
||||
final AtomicReference<Collection<PingResponse>> response = new AtomicReference<>();
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
ping(pings -> {
|
||||
response.set(pings);
|
||||
latch.countDown();
|
||||
}, duration);
|
||||
try {
|
||||
latch.await();
|
||||
return response.get();
|
||||
} catch (InterruptedException e) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Sends three rounds of pings notifying the specified {@link PingListener} when pinging is complete. Pings are sent after resolving
|
||||
* Sends three rounds of pings notifying the specified {@link Consumer} when pinging is complete. Pings are sent after resolving
|
||||
* configured unicast hosts to their IP address (subject to DNS caching within the JVM). A batch of pings is sent, then another batch
|
||||
* of pings is sent at half the specified {@link TimeValue}, and then another batch of pings is sent at the specified {@link TimeValue}.
|
||||
* The pings that are sent carry a timeout of 1.25 times the specified {@link TimeValue}. When pinging each node, a connection and
|
||||
* handshake is performed, with a connection timeout of the specified {@link TimeValue}.
|
||||
*
|
||||
* @param listener the callback when pinging is complete
|
||||
* @param duration the timeout for various components of the pings
|
||||
* @param resultsConsumer the callback when pinging is complete
|
||||
* @param duration the timeout for various components of the pings
|
||||
*/
|
||||
@Override
|
||||
public void ping(final PingListener listener, final TimeValue duration) {
|
||||
final List<DiscoveryNode> resolvedDiscoveryNodes;
|
||||
public void ping(final Consumer<PingCollection> resultsConsumer, final TimeValue duration) {
|
||||
ping(resultsConsumer, duration, duration);
|
||||
}
|
||||
|
||||
/**
|
||||
* a variant of {@link #ping(Consumer, TimeValue)}, but allows separating the scheduling duration
|
||||
* from the duration used for request level time outs. This is useful for testing
|
||||
*/
|
||||
protected void ping(final Consumer<PingCollection> resultsConsumer,
|
||||
final TimeValue scheduleDuration,
|
||||
final TimeValue requestDuration) {
|
||||
final List<DiscoveryNode> seedNodes;
|
||||
try {
|
||||
resolvedDiscoveryNodes = resolveDiscoveryNodes(
|
||||
seedNodes = resolveHostsLists(
|
||||
unicastZenPingExecutorService,
|
||||
logger,
|
||||
configuredHosts,
|
||||
limitPortCounts,
|
||||
transportService,
|
||||
() -> UNICAST_NODE_PREFIX + unicastNodeIdGenerator.incrementAndGet() + "#",
|
||||
UNICAST_NODE_PREFIX,
|
||||
resolveTimeout);
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
final SendPingsHandler sendPingsHandler = new SendPingsHandler(pingHandlerIdGenerator.incrementAndGet());
|
||||
try {
|
||||
receivedResponses.put(sendPingsHandler.id(), sendPingsHandler);
|
||||
try {
|
||||
sendPings(duration, null, sendPingsHandler, resolvedDiscoveryNodes);
|
||||
} catch (RejectedExecutionException e) {
|
||||
logger.debug("Ping execution rejected", e);
|
||||
// The RejectedExecutionException can come from the fact unicastZenPingExecutorService is at its max down in sendPings
|
||||
// But don't bail here, we can retry later on after the send ping has been scheduled.
|
||||
seedNodes.addAll(hostsProvider.buildDynamicNodes());
|
||||
final DiscoveryNodes nodes = contextProvider.nodes();
|
||||
// add all possible master nodes that were active in the last known cluster configuration
|
||||
for (ObjectCursor<DiscoveryNode> masterNode : nodes.getMasterNodes().values()) {
|
||||
seedNodes.add(masterNode.value);
|
||||
}
|
||||
|
||||
final ConnectionProfile connectionProfile =
|
||||
ConnectionProfile.buildSingleChannelProfile(TransportRequestOptions.Type.REG, requestDuration, requestDuration);
|
||||
final PingingRound pingingRound = new PingingRound(pingingRoundIdGenerator.incrementAndGet(), seedNodes, resultsConsumer,
|
||||
nodes.getLocalNode(), connectionProfile);
|
||||
activePingingRounds.put(pingingRound.id(), pingingRound);
|
||||
final AbstractRunnable pingSender = new AbstractRunnable() {
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
if (e instanceof AlreadyClosedException == false) {
|
||||
logger.warn("unexpected error while pinging", e);
|
||||
}
|
||||
}
|
||||
|
||||
threadPool.schedule(TimeValue.timeValueMillis(duration.millis() / 2), ThreadPool.Names.GENERIC, new AbstractRunnable() {
|
||||
@Override
|
||||
protected void doRun() {
|
||||
sendPings(duration, null, sendPingsHandler, resolvedDiscoveryNodes);
|
||||
threadPool.schedule(TimeValue.timeValueMillis(duration.millis() / 2), ThreadPool.Names.GENERIC, new AbstractRunnable() {
|
||||
@Override
|
||||
protected void doRun() throws Exception {
|
||||
sendPings(duration, TimeValue.timeValueMillis(duration.millis() / 2), sendPingsHandler, resolvedDiscoveryNodes);
|
||||
sendPingsHandler.close();
|
||||
listener.onPing(sendPingsHandler.pingCollection().toList());
|
||||
for (DiscoveryNode node : sendPingsHandler.nodeToDisconnect) {
|
||||
logger.trace("[{}] disconnecting from {}", sendPingsHandler.id(), node);
|
||||
transportService.disconnectFromNode(node);
|
||||
}
|
||||
}
|
||||
@Override
|
||||
protected void doRun() throws Exception {
|
||||
sendPings(requestDuration, pingingRound);
|
||||
}
|
||||
};
|
||||
threadPool.generic().execute(pingSender);
|
||||
threadPool.schedule(TimeValue.timeValueMillis(scheduleDuration.millis() / 3), ThreadPool.Names.GENERIC, pingSender);
|
||||
threadPool.schedule(TimeValue.timeValueMillis(scheduleDuration.millis() / 3 * 2), ThreadPool.Names.GENERIC, pingSender);
|
||||
threadPool.schedule(scheduleDuration, ThreadPool.Names.GENERIC, new AbstractRunnable() {
|
||||
@Override
|
||||
protected void doRun() throws Exception {
|
||||
finishPingingRound(pingingRound);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
logger.debug("Ping execution failed", e);
|
||||
sendPingsHandler.close();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
logger.debug("Ping execution failed", e);
|
||||
sendPingsHandler.close();
|
||||
}
|
||||
});
|
||||
} catch (EsRejectedExecutionException ex) { // TODO: remove this once ScheduledExecutor has support for AbstractRunnable
|
||||
sendPingsHandler.close();
|
||||
// we are shutting down
|
||||
} catch (Exception e) {
|
||||
sendPingsHandler.close();
|
||||
throw new ElasticsearchException("Ping execution failed", e);
|
||||
}
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
logger.warn("unexpected error while finishing pinging round", e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
class SendPingsHandler implements Releasable {
|
||||
// for testing
|
||||
protected void finishPingingRound(PingingRound pingingRound) {
|
||||
pingingRound.close();
|
||||
}
|
||||
|
||||
protected class PingingRound implements Releasable {
|
||||
private final int id;
|
||||
private final Set<DiscoveryNode> nodeToDisconnect = ConcurrentCollections.newConcurrentSet();
|
||||
private final Map<TransportAddress, Connection> tempConnections = new HashMap<>();
|
||||
private final KeyedLock<TransportAddress> connectionLock = new KeyedLock<>(true);
|
||||
private final PingCollection pingCollection;
|
||||
private final List<DiscoveryNode> seedNodes;
|
||||
private final Consumer<PingCollection> pingListener;
|
||||
private final DiscoveryNode localNode;
|
||||
private final ConnectionProfile connectionProfile;
|
||||
|
||||
private AtomicBoolean closed = new AtomicBoolean(false);
|
||||
|
||||
SendPingsHandler(int id) {
|
||||
PingingRound(int id, List<DiscoveryNode> seedNodes, Consumer<PingCollection> resultsConsumer, DiscoveryNode localNode,
|
||||
ConnectionProfile connectionProfile) {
|
||||
this.id = id;
|
||||
this.seedNodes = Collections.unmodifiableList(new ArrayList<>(seedNodes));
|
||||
this.pingListener = resultsConsumer;
|
||||
this.localNode = localNode;
|
||||
this.connectionProfile = connectionProfile;
|
||||
this.pingCollection = new PingCollection();
|
||||
}
|
||||
|
||||
|
@ -377,154 +375,170 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing {
|
|||
return this.closed.get();
|
||||
}
|
||||
|
||||
public PingCollection pingCollection() {
|
||||
return pingCollection;
|
||||
public List<DiscoveryNode> getSeedNodes() {
|
||||
ensureOpen();
|
||||
return seedNodes;
|
||||
}
|
||||
|
||||
public Connection getOrConnect(DiscoveryNode node) throws IOException {
|
||||
Connection result;
|
||||
try (Releasable ignore = connectionLock.acquire(node.getAddress())) {
|
||||
result = tempConnections.get(node.getAddress());
|
||||
if (result == null) {
|
||||
boolean success = false;
|
||||
result = transportService.openConnection(node, connectionProfile);
|
||||
try {
|
||||
transportService.handshake(result, connectionProfile.getHandshakeTimeout().millis());
|
||||
synchronized (this) {
|
||||
// acquire lock to prevent concurrent closing
|
||||
Connection existing = tempConnections.put(node.getAddress(), result);
|
||||
assert existing == null;
|
||||
success = true;
|
||||
}
|
||||
} finally {
|
||||
if (success == false) {
|
||||
IOUtils.closeWhileHandlingException(result);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
private void ensureOpen() {
|
||||
if (isClosed()) {
|
||||
throw new AlreadyClosedException("pinging round [" + id + "] is finished");
|
||||
}
|
||||
}
|
||||
|
||||
public void addPingResponseToCollection(PingResponse pingResponse) {
|
||||
if (localNode.equals(pingResponse.node()) == false) {
|
||||
pingCollection.addPing(pingResponse);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
if (closed.compareAndSet(false, true)) {
|
||||
receivedResponses.remove(id);
|
||||
List<Connection> toClose = null;
|
||||
synchronized (this) {
|
||||
if (closed.compareAndSet(false, true)) {
|
||||
activePingingRounds.remove(id);
|
||||
toClose = new ArrayList<>(tempConnections.values());
|
||||
tempConnections.clear();
|
||||
}
|
||||
}
|
||||
if (toClose != null) {
|
||||
// we actually closed
|
||||
try {
|
||||
pingListener.accept(pingCollection);
|
||||
} finally {
|
||||
IOUtils.closeWhileHandlingException(toClose);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public ConnectionProfile getConnectionProfile() {
|
||||
return connectionProfile;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void sendPings(
|
||||
final TimeValue timeout,
|
||||
@Nullable TimeValue waitTime,
|
||||
final SendPingsHandler sendPingsHandler,
|
||||
final List<DiscoveryNode> resolvedDiscoveryNodes) {
|
||||
protected void sendPings(final TimeValue timeout, final PingingRound pingingRound) {
|
||||
final UnicastPingRequest pingRequest = new UnicastPingRequest();
|
||||
pingRequest.id = sendPingsHandler.id();
|
||||
pingRequest.id = pingingRound.id();
|
||||
pingRequest.timeout = timeout;
|
||||
DiscoveryNodes discoNodes = contextProvider.nodes();
|
||||
|
||||
pingRequest.pingResponse = createPingResponse(discoNodes);
|
||||
|
||||
HashSet<DiscoveryNode> nodesToPingSet = new HashSet<>();
|
||||
for (PingResponse temporalResponse : temporalResponses) {
|
||||
// Only send pings to nodes that have the same cluster name.
|
||||
if (clusterName.equals(temporalResponse.clusterName())) {
|
||||
nodesToPingSet.add(temporalResponse.node());
|
||||
}
|
||||
}
|
||||
nodesToPingSet.addAll(hostsProvider.buildDynamicNodes());
|
||||
Set<DiscoveryNode> nodesFromResponses = temporalResponses.stream().map(pingResponse -> {
|
||||
assert clusterName.equals(pingResponse.clusterName()) :
|
||||
"got a ping request from a different cluster. expected " + clusterName + " got " + pingResponse.clusterName();
|
||||
return pingResponse.node();
|
||||
}).collect(Collectors.toSet());
|
||||
|
||||
// add all possible master nodes that were active in the last known cluster configuration
|
||||
for (ObjectCursor<DiscoveryNode> masterNode : discoNodes.getMasterNodes().values()) {
|
||||
nodesToPingSet.add(masterNode.value);
|
||||
}
|
||||
// dedup by address
|
||||
final Map<TransportAddress, DiscoveryNode> uniqueNodesByAddress =
|
||||
Stream.concat(pingingRound.getSeedNodes().stream(), nodesFromResponses.stream())
|
||||
.collect(Collectors.toMap(DiscoveryNode::getAddress, Function.identity(), (n1, n2) -> n1));
|
||||
|
||||
// sort the nodes by likelihood of being an active master
|
||||
List<DiscoveryNode> sortedNodesToPing = ElectMasterService.sortByMasterLikelihood(nodesToPingSet);
|
||||
|
||||
// add the configured hosts first
|
||||
final List<DiscoveryNode> nodesToPing = new ArrayList<>(resolvedDiscoveryNodes.size() + sortedNodesToPing.size());
|
||||
nodesToPing.addAll(resolvedDiscoveryNodes);
|
||||
nodesToPing.addAll(sortedNodesToPing);
|
||||
|
||||
final CountDownLatch latch = new CountDownLatch(nodesToPing.size());
|
||||
for (final DiscoveryNode node : nodesToPing) {
|
||||
// make sure we are connected
|
||||
final boolean nodeFoundByAddress;
|
||||
DiscoveryNode nodeToSend = discoNodes.findByAddress(node.getAddress());
|
||||
if (nodeToSend != null) {
|
||||
nodeFoundByAddress = true;
|
||||
} else {
|
||||
nodeToSend = node;
|
||||
nodeFoundByAddress = false;
|
||||
}
|
||||
|
||||
if (!transportService.nodeConnected(nodeToSend)) {
|
||||
if (sendPingsHandler.isClosed()) {
|
||||
return;
|
||||
// resolve what we can via the latest cluster state
|
||||
final Set<DiscoveryNode> nodesToPing = uniqueNodesByAddress.values().stream()
|
||||
.map(node -> {
|
||||
DiscoveryNode foundNode = discoNodes.findByAddress(node.getAddress());
|
||||
if (foundNode == null) {
|
||||
return node;
|
||||
} else {
|
||||
return foundNode;
|
||||
}
|
||||
// if we find on the disco nodes a matching node by address, we are going to restore the connection
|
||||
// anyhow down the line if its not connected...
|
||||
// if we can't resolve the node, we don't know and we have to clean up after pinging. We do have
|
||||
// to make sure we don't disconnect a true node which was temporarily removed from the DiscoveryNodes
|
||||
// but will be added again during the pinging. We therefore create a new temporary node
|
||||
if (!nodeFoundByAddress) {
|
||||
if (!nodeToSend.getId().startsWith(UNICAST_NODE_PREFIX)) {
|
||||
DiscoveryNode tempNode = new DiscoveryNode("",
|
||||
UNICAST_NODE_PREFIX + unicastNodeIdGenerator.incrementAndGet() + "_" + nodeToSend.getId() + "#",
|
||||
UUIDs.randomBase64UUID(), nodeToSend.getHostName(), nodeToSend.getHostAddress(), nodeToSend.getAddress(),
|
||||
nodeToSend.getAttributes(), nodeToSend.getRoles(), nodeToSend.getVersion());
|
||||
}).collect(Collectors.toSet());
|
||||
|
||||
logger.trace("replacing {} with temp node {}", nodeToSend, tempNode);
|
||||
nodeToSend = tempNode;
|
||||
}
|
||||
sendPingsHandler.nodeToDisconnect.add(nodeToSend);
|
||||
}
|
||||
// fork the connection to another thread
|
||||
final DiscoveryNode finalNodeToSend = nodeToSend;
|
||||
unicastZenPingExecutorService.execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
if (sendPingsHandler.isClosed()) {
|
||||
return;
|
||||
}
|
||||
boolean success = false;
|
||||
try {
|
||||
// connect to the node, see if we manage to do it, if not, bail
|
||||
if (!nodeFoundByAddress) {
|
||||
logger.trace("[{}] connecting (light) to {}", sendPingsHandler.id(), finalNodeToSend);
|
||||
transportService.connectToNodeAndHandshake(finalNodeToSend, timeout.getMillis());
|
||||
} else {
|
||||
logger.trace("[{}] connecting to {}", sendPingsHandler.id(), finalNodeToSend);
|
||||
transportService.connectToNode(finalNodeToSend);
|
||||
}
|
||||
logger.trace("[{}] connected to {}", sendPingsHandler.id(), node);
|
||||
if (receivedResponses.containsKey(sendPingsHandler.id())) {
|
||||
// we are connected and still in progress, send the ping request
|
||||
sendPingRequestToNode(sendPingsHandler.id(), timeout, pingRequest, latch, node, finalNodeToSend);
|
||||
} else {
|
||||
// connect took too long, just log it and bail
|
||||
latch.countDown();
|
||||
logger.trace("[{}] connect to {} was too long outside of ping window, bailing",
|
||||
sendPingsHandler.id(), node);
|
||||
}
|
||||
success = true;
|
||||
} catch (ConnectTransportException e) {
|
||||
// can't connect to the node - this is a more common path!
|
||||
logger.trace(
|
||||
(Supplier<?>) () -> new ParameterizedMessage(
|
||||
"[{}] failed to connect to {}", sendPingsHandler.id(), finalNodeToSend), e);
|
||||
} catch (RemoteTransportException e) {
|
||||
// something went wrong on the other side
|
||||
logger.debug(
|
||||
(Supplier<?>) () -> new ParameterizedMessage(
|
||||
"[{}] received a remote error as a response to ping {}", sendPingsHandler.id(), finalNodeToSend), e);
|
||||
} catch (Exception e) {
|
||||
logger.warn(
|
||||
(Supplier<?>) () -> new ParameterizedMessage(
|
||||
"[{}] failed send ping to {}", sendPingsHandler.id(), finalNodeToSend), e);
|
||||
} finally {
|
||||
if (!success) {
|
||||
latch.countDown();
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
} else {
|
||||
sendPingRequestToNode(sendPingsHandler.id(), timeout, pingRequest, latch, node, nodeToSend);
|
||||
}
|
||||
}
|
||||
if (waitTime != null) {
|
||||
try {
|
||||
latch.await(waitTime.millis(), TimeUnit.MILLISECONDS);
|
||||
} catch (InterruptedException e) {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
nodesToPing.forEach(node -> sendPingRequestToNode(node, timeout, pingingRound, pingRequest));
|
||||
}
|
||||
|
||||
private void sendPingRequestToNode(final int id, final TimeValue timeout, final UnicastPingRequest pingRequest,
|
||||
final CountDownLatch latch, final DiscoveryNode node, final DiscoveryNode nodeToSend) {
|
||||
logger.trace("[{}] sending to {}", id, nodeToSend);
|
||||
transportService.sendRequest(nodeToSend, ACTION_NAME, pingRequest, TransportRequestOptions.builder()
|
||||
.withTimeout((long) (timeout.millis() * 1.25)).build(), new TransportResponseHandler<UnicastPingResponse>() {
|
||||
private void sendPingRequestToNode(final DiscoveryNode node, TimeValue timeout, final PingingRound pingingRound,
|
||||
final UnicastPingRequest pingRequest) {
|
||||
submitToExecutor(new AbstractRunnable() {
|
||||
@Override
|
||||
protected void doRun() throws Exception {
|
||||
Connection connection = null;
|
||||
if (transportService.nodeConnected(node)) {
|
||||
try {
|
||||
// concurrency can still cause disconnects
|
||||
connection = transportService.getConnection(node);
|
||||
} catch (NodeNotConnectedException e) {
|
||||
logger.trace("[{}] node [{}] just disconnected, will create a temp connection", pingingRound.id(), node);
|
||||
}
|
||||
}
|
||||
|
||||
if (connection == null) {
|
||||
connection = pingingRound.getOrConnect(node);
|
||||
}
|
||||
|
||||
logger.trace("[{}] sending to {}", pingingRound.id(), node);
|
||||
transportService.sendRequest(connection, ACTION_NAME, pingRequest,
|
||||
TransportRequestOptions.builder().withTimeout((long) (timeout.millis() * 1.25)).build(),
|
||||
getPingResponseHandler(pingingRound, node));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
if (e instanceof ConnectTransportException || e instanceof AlreadyClosedException) {
|
||||
// can't connect to the node - this is more common path!
|
||||
logger.trace(
|
||||
(Supplier<?>) () -> new ParameterizedMessage(
|
||||
"[{}] failed to ping {}", pingingRound.id(), node), e);
|
||||
} else if (e instanceof RemoteTransportException) {
|
||||
// something went wrong on the other side
|
||||
logger.debug(
|
||||
(Supplier<?>) () -> new ParameterizedMessage(
|
||||
"[{}] received a remote error as a response to ping {}", pingingRound.id(), node), e);
|
||||
} else {
|
||||
logger.warn(
|
||||
(Supplier<?>) () -> new ParameterizedMessage(
|
||||
"[{}] failed send ping to {}", pingingRound.id(), node), e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onRejection(Exception e) {
|
||||
// The RejectedExecutionException can come from the fact unicastZenPingExecutorService is at its max down in sendPings
|
||||
// But don't bail here, we can retry later on after the send ping has been scheduled.
|
||||
logger.debug("Ping execution rejected", e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// for testing
|
||||
protected void submitToExecutor(AbstractRunnable abstractRunnable) {
|
||||
unicastZenPingExecutorService.execute(abstractRunnable);
|
||||
}
|
||||
|
||||
// for testing
|
||||
protected TransportResponseHandler<UnicastPingResponse> getPingResponseHandler(final PingingRound pingingRound,
|
||||
final DiscoveryNode node) {
|
||||
return new TransportResponseHandler<UnicastPingResponse>() {
|
||||
|
||||
@Override
|
||||
public UnicastPingResponse newInstance() {
|
||||
|
@ -538,50 +552,36 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing {
|
|||
|
||||
@Override
|
||||
public void handleResponse(UnicastPingResponse response) {
|
||||
logger.trace("[{}] received response from {}: {}", id, nodeToSend, Arrays.toString(response.pingResponses));
|
||||
try {
|
||||
DiscoveryNodes discoveryNodes = contextProvider.nodes();
|
||||
for (PingResponse pingResponse : response.pingResponses) {
|
||||
if (pingResponse.node().equals(discoveryNodes.getLocalNode())) {
|
||||
// that's us, ignore
|
||||
continue;
|
||||
}
|
||||
SendPingsHandler sendPingsHandler = receivedResponses.get(response.id);
|
||||
if (sendPingsHandler == null) {
|
||||
if (!closed) {
|
||||
// Only log when we're not closing the node. Having no send ping handler is then expected
|
||||
logger.warn("received ping response {} with no matching handler id [{}]", pingResponse, response.id);
|
||||
}
|
||||
} else {
|
||||
sendPingsHandler.pingCollection().addPing(pingResponse);
|
||||
}
|
||||
logger.trace("[{}] received response from {}: {}", pingingRound.id(), node, Arrays.toString(response.pingResponses));
|
||||
if (pingingRound.isClosed()) {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("[{}] skipping received response from {}. already closed", pingingRound.id(), node);
|
||||
}
|
||||
} finally {
|
||||
latch.countDown();
|
||||
} else {
|
||||
Stream.of(response.pingResponses).forEach(pingingRound::addPingResponseToCollection);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleException(TransportException exp) {
|
||||
latch.countDown();
|
||||
if (exp instanceof ConnectTransportException) {
|
||||
if (exp instanceof ConnectTransportException || exp.getCause() instanceof ConnectTransportException) {
|
||||
// ok, not connected...
|
||||
logger.trace((Supplier<?>) () -> new ParameterizedMessage("failed to connect to {}", nodeToSend), exp);
|
||||
} else {
|
||||
logger.trace((Supplier<?>) () -> new ParameterizedMessage("failed to connect to {}", node), exp);
|
||||
} else if (closed == false) {
|
||||
logger.warn((Supplier<?>) () -> new ParameterizedMessage("failed to send ping to [{}]", node), exp);
|
||||
}
|
||||
}
|
||||
});
|
||||
};
|
||||
}
|
||||
|
||||
private UnicastPingResponse handlePingRequest(final UnicastPingRequest request) {
|
||||
assert clusterName.equals(request.pingResponse.clusterName()) :
|
||||
"got a ping request from a different cluster. expected " + clusterName + " got " + request.pingResponse.clusterName();
|
||||
temporalResponses.add(request.pingResponse);
|
||||
threadPool.schedule(TimeValue.timeValueMillis(request.timeout.millis() * 2), ThreadPool.Names.SAME, new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
temporalResponses.remove(request.pingResponse);
|
||||
}
|
||||
});
|
||||
// add to any ongoing pinging
|
||||
activePingingRounds.values().forEach(p -> p.addPingResponseToCollection(request.pingResponse));
|
||||
threadPool.schedule(TimeValue.timeValueMillis(request.timeout.millis() * 2), ThreadPool.Names.SAME,
|
||||
() -> temporalResponses.remove(request.pingResponse));
|
||||
|
||||
List<PingResponse> pingResponses = CollectionUtils.iterableAsArrayList(temporalResponses);
|
||||
pingResponses.add(createPingResponse(contextProvider.nodes()));
|
||||
|
@ -601,11 +601,11 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing {
|
|||
channel.sendResponse(handlePingRequest(request));
|
||||
} else {
|
||||
throw new IllegalStateException(
|
||||
String.format(
|
||||
Locale.ROOT,
|
||||
"mismatched cluster names; request: [%s], local: [%s]",
|
||||
request.pingResponse.clusterName().value(),
|
||||
clusterName.value()));
|
||||
String.format(
|
||||
Locale.ROOT,
|
||||
"mismatched cluster names; request: [%s], local: [%s]",
|
||||
request.pingResponse.clusterName().value(),
|
||||
clusterName.value()));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -67,11 +67,11 @@ import java.io.IOException;
|
|||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.BiFunction;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
|
@ -1021,24 +1021,22 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
|
|||
}
|
||||
|
||||
private ZenPing.PingCollection pingAndWait(TimeValue timeout) {
|
||||
final ZenPing.PingCollection response = new ZenPing.PingCollection();
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
final CompletableFuture<ZenPing.PingCollection> response = new CompletableFuture<>();
|
||||
try {
|
||||
zenPing.ping(pings -> {
|
||||
response.addPings(pings);
|
||||
latch.countDown();
|
||||
}, timeout);
|
||||
zenPing.ping(response::complete, timeout);
|
||||
} catch (Exception ex) {
|
||||
logger.warn("Ping execution failed", ex);
|
||||
latch.countDown();
|
||||
// logged later
|
||||
response.completeExceptionally(ex);
|
||||
}
|
||||
|
||||
try {
|
||||
latch.await();
|
||||
return response;
|
||||
return response.get();
|
||||
} catch (InterruptedException e) {
|
||||
logger.trace("pingAndWait interrupted");
|
||||
return response;
|
||||
return new ZenPing.PingCollection();
|
||||
} catch (ExecutionException e) {
|
||||
logger.warn("Ping execution failed", e);
|
||||
return new ZenPing.PingCollection();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -30,11 +30,11 @@ import org.elasticsearch.common.unit.TimeValue;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK;
|
||||
|
||||
|
@ -42,17 +42,7 @@ public interface ZenPing extends Releasable {
|
|||
|
||||
void start(PingContextProvider contextProvider);
|
||||
|
||||
void ping(PingListener listener, TimeValue timeout);
|
||||
|
||||
interface PingListener {
|
||||
|
||||
/**
|
||||
* called when pinging is done.
|
||||
*
|
||||
* @param pings ping result *must
|
||||
*/
|
||||
void onPing(Collection<PingResponse> pings);
|
||||
}
|
||||
void ping(Consumer<PingCollection> resultsConsumer, TimeValue timeout);
|
||||
|
||||
class PingResponse implements Streamable {
|
||||
|
||||
|
@ -191,13 +181,6 @@ public interface ZenPing extends Releasable {
|
|||
return false;
|
||||
}
|
||||
|
||||
/** adds multiple pings if newer than previous pings from the same node */
|
||||
public synchronized void addPings(Iterable<PingResponse> pings) {
|
||||
for (PingResponse ping : pings) {
|
||||
addPing(ping);
|
||||
}
|
||||
}
|
||||
|
||||
/** serialize current pings to a list. It is guaranteed that the list contains one ping response per node */
|
||||
public synchronized List<PingResponse> toList() {
|
||||
return new ArrayList<>(pings.values());
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
*/
|
||||
package org.elasticsearch.transport;
|
||||
|
||||
import org.elasticsearch.common.inject.internal.Nullable;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
|
||||
import java.util.ArrayList;
|
||||
|
@ -35,16 +36,25 @@ import java.util.concurrent.atomic.AtomicInteger;
|
|||
public final class ConnectionProfile {
|
||||
|
||||
/**
|
||||
* A pre-built light connection profile that shares a single connection across all
|
||||
* types.
|
||||
* Builds a connection profile that is dedicated to a single channel type. Use this
|
||||
* when opening single use connections
|
||||
*/
|
||||
public static final ConnectionProfile LIGHT_PROFILE = new ConnectionProfile(
|
||||
Collections.singletonList(new ConnectionTypeHandle(0, 1, EnumSet.of(
|
||||
TransportRequestOptions.Type.BULK,
|
||||
TransportRequestOptions.Type.PING,
|
||||
TransportRequestOptions.Type.RECOVERY,
|
||||
TransportRequestOptions.Type.REG,
|
||||
TransportRequestOptions.Type.STATE))), 1, null, null);
|
||||
public static ConnectionProfile buildSingleChannelProfile(TransportRequestOptions.Type channelType,
|
||||
@Nullable TimeValue connectTimeout,
|
||||
@Nullable TimeValue handshakeTimeout) {
|
||||
Builder builder = new Builder();
|
||||
builder.addConnections(1, channelType);
|
||||
final EnumSet<TransportRequestOptions.Type> otherTypes = EnumSet.allOf(TransportRequestOptions.Type.class);
|
||||
otherTypes.remove(channelType);
|
||||
builder.addConnections(0, otherTypes.stream().toArray(TransportRequestOptions.Type[]::new));
|
||||
if (connectTimeout != null) {
|
||||
builder.setConnectTimeout(connectTimeout);
|
||||
}
|
||||
if (handshakeTimeout != null) {
|
||||
builder.setHandshakeTimeout(handshakeTimeout);
|
||||
}
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
private final List<ConnectionTypeHandle> handles;
|
||||
private final int numConnections;
|
||||
|
|
|
@ -63,8 +63,7 @@ public interface Transport extends LifecycleComponent {
|
|||
boolean nodeConnected(DiscoveryNode node);
|
||||
|
||||
/**
|
||||
* Connects to a node with the given connection profile. Use {@link ConnectionProfile#LIGHT_PROFILE} when just connecting for ping
|
||||
* and then disconnecting. If the node is already connected this method has no effect
|
||||
* Connects to a node with the given connection profile. If the node is already connected this method has no effect
|
||||
*/
|
||||
void connectToNode(DiscoveryNode node, ConnectionProfile connectionProfile) throws ConnectTransportException;
|
||||
|
||||
|
|
|
@ -62,7 +62,6 @@ import java.util.Objects;
|
|||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
|
@ -328,32 +327,6 @@ public class TransportService extends AbstractLifecycleComponent {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Lightly connect to the specified node, returning updated node
|
||||
* information. The handshake will fail if the cluster name on the
|
||||
* target node mismatches the local cluster name and
|
||||
* {@code checkClusterName} is {@code true}.
|
||||
*
|
||||
* @param node the node to connect to
|
||||
* @param handshakeTimeout handshake timeout
|
||||
* @return the connected node
|
||||
* @throws ConnectTransportException if the connection failed
|
||||
* @throws IllegalStateException if the handshake failed
|
||||
*/
|
||||
public DiscoveryNode connectToNodeAndHandshake(
|
||||
final DiscoveryNode node,
|
||||
final long handshakeTimeout) throws IOException {
|
||||
if (node.equals(localNode)) {
|
||||
return localNode;
|
||||
}
|
||||
DiscoveryNode handshakeNode;
|
||||
try (Transport.Connection connection = transport.openConnection(node, ConnectionProfile.LIGHT_PROFILE)) {
|
||||
handshakeNode = handshake(connection, handshakeTimeout);
|
||||
}
|
||||
connectToNode(node, ConnectionProfile.LIGHT_PROFILE);
|
||||
return handshakeNode;
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes a high-level handshake using the given connection
|
||||
* and returns the discovery node of the node the connection
|
||||
|
|
|
@ -26,6 +26,7 @@ import org.elasticsearch.cluster.ClusterName;
|
|||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.block.ClusterBlocks;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode.Role;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
import org.elasticsearch.common.network.NetworkAddress;
|
||||
|
@ -34,18 +35,22 @@ import org.elasticsearch.common.settings.Settings;
|
|||
import org.elasticsearch.common.transport.TransportAddress;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.BigArrays;
|
||||
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
||||
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
||||
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
||||
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.test.VersionUtils;
|
||||
import org.elasticsearch.test.junit.annotations.TestLogging;
|
||||
import org.elasticsearch.test.transport.MockTransportService;
|
||||
import org.elasticsearch.threadpool.TestThreadPool;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.ConnectionProfile;
|
||||
import org.elasticsearch.transport.MockTcpTransport;
|
||||
import org.elasticsearch.transport.Transport;
|
||||
import org.elasticsearch.transport.TransportConnectionListener;
|
||||
import org.elasticsearch.transport.TransportException;
|
||||
import org.elasticsearch.transport.TransportRequestOptions;
|
||||
import org.elasticsearch.transport.TransportResponseHandler;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.transport.TransportSettings;
|
||||
import org.junit.After;
|
||||
|
@ -60,12 +65,14 @@ import java.util.ArrayList;
|
|||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.EnumSet;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.Stack;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
|
@ -82,7 +89,6 @@ import static java.util.Collections.emptySet;
|
|||
import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK;
|
||||
import static org.hamcrest.Matchers.empty;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.greaterThan;
|
||||
import static org.hamcrest.Matchers.hasSize;
|
||||
import static org.mockito.Matchers.eq;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
@ -124,8 +130,7 @@ public class UnicastZenPingTests extends ESTestCase {
|
|||
|
||||
private static final UnicastHostsProvider EMPTY_HOSTS_PROVIDER = Collections::emptyList;
|
||||
|
||||
@TestLogging("org.elasticsearch.transport:TRACE,org.elasticsearch.discovery.zen.UnicastZenPing:TRACE")
|
||||
public void testSimplePings() throws IOException, InterruptedException {
|
||||
public void testSimplePings() throws IOException, InterruptedException, ExecutionException {
|
||||
// use ephemeral ports
|
||||
final Settings settings = Settings.builder().put("cluster.name", "test").put(TransportSettings.PORT.getKey(), 0).build();
|
||||
final Settings settingsMismatch =
|
||||
|
@ -140,7 +145,12 @@ public class UnicastZenPingTests extends ESTestCase {
|
|||
new NoneCircuitBreakerService(),
|
||||
new NamedWriteableRegistry(Collections.emptyList()),
|
||||
networkService,
|
||||
v);
|
||||
v) {
|
||||
@Override
|
||||
public void connectToNode(DiscoveryNode node, ConnectionProfile connectionProfile) {
|
||||
throw new AssertionError("zen pings should never connect to node (got [" + node + "])");
|
||||
}
|
||||
};
|
||||
|
||||
NetworkHandle handleA = startServices(settings, threadPool, "UZP_A", Version.CURRENT, supplier);
|
||||
closeables.push(handleA.transportService);
|
||||
|
@ -148,25 +158,30 @@ public class UnicastZenPingTests extends ESTestCase {
|
|||
closeables.push(handleB.transportService);
|
||||
NetworkHandle handleC = startServices(settingsMismatch, threadPool, "UZP_C", Version.CURRENT, supplier);
|
||||
closeables.push(handleC.transportService);
|
||||
// just fake that no versions are compatible with this node
|
||||
Version previousVersion = VersionUtils.getPreviousVersion(Version.CURRENT.minimumCompatibilityVersion());
|
||||
Version versionD = VersionUtils.randomVersionBetween(random(), previousVersion.minimumCompatibilityVersion(), previousVersion);
|
||||
final Version versionD;
|
||||
if (randomBoolean()) {
|
||||
versionD = VersionUtils.randomVersionBetween(random(), Version.CURRENT.minimumCompatibilityVersion(), Version.CURRENT);
|
||||
} else {
|
||||
versionD = Version.CURRENT;
|
||||
}
|
||||
logger.info("UZP_D version set to [{}]", versionD);
|
||||
NetworkHandle handleD = startServices(settingsMismatch, threadPool, "UZP_D", versionD, supplier);
|
||||
closeables.push(handleD.transportService);
|
||||
|
||||
final ClusterState state = ClusterState.builder(new ClusterName("test")).version(randomPositiveLong()).build();
|
||||
final ClusterState stateMismatch = ClusterState.builder(new ClusterName("mismatch")).version(randomPositiveLong()).build();
|
||||
|
||||
Settings hostsSettings = Settings.builder()
|
||||
.putArray("discovery.zen.ping.unicast.hosts",
|
||||
.putArray("discovery.zen.ping.unicast.hosts",
|
||||
NetworkAddress.format(new InetSocketAddress(handleA.address.address().getAddress(), handleA.address.address().getPort())),
|
||||
NetworkAddress.format(new InetSocketAddress(handleB.address.address().getAddress(), handleB.address.address().getPort())),
|
||||
NetworkAddress.format(new InetSocketAddress(handleC.address.address().getAddress(), handleC.address.address().getPort())),
|
||||
NetworkAddress.format(new InetSocketAddress(handleD.address.address().getAddress(), handleD.address.address().getPort())))
|
||||
.put("cluster.name", "test")
|
||||
.build();
|
||||
.put("cluster.name", "test")
|
||||
.build();
|
||||
|
||||
Settings hostsSettingsMismatch = Settings.builder().put(hostsSettings).put(settingsMismatch).build();
|
||||
UnicastZenPing zenPingA = new UnicastZenPing(hostsSettings, threadPool, handleA.transportService, EMPTY_HOSTS_PROVIDER);
|
||||
TestUnicastZenPing zenPingA = new TestUnicastZenPing(hostsSettings, threadPool, handleA, EMPTY_HOSTS_PROVIDER);
|
||||
zenPingA.start(new PingContextProvider() {
|
||||
@Override
|
||||
public DiscoveryNodes nodes() {
|
||||
|
@ -180,7 +195,7 @@ public class UnicastZenPingTests extends ESTestCase {
|
|||
});
|
||||
closeables.push(zenPingA);
|
||||
|
||||
UnicastZenPing zenPingB = new UnicastZenPing(hostsSettings, threadPool, handleB.transportService, EMPTY_HOSTS_PROVIDER);
|
||||
TestUnicastZenPing zenPingB = new TestUnicastZenPing(hostsSettings, threadPool, handleB, EMPTY_HOSTS_PROVIDER);
|
||||
zenPingB.start(new PingContextProvider() {
|
||||
@Override
|
||||
public DiscoveryNodes nodes() {
|
||||
|
@ -194,7 +209,8 @@ public class UnicastZenPingTests extends ESTestCase {
|
|||
});
|
||||
closeables.push(zenPingB);
|
||||
|
||||
UnicastZenPing zenPingC = new UnicastZenPing(hostsSettingsMismatch, threadPool, handleC.transportService, EMPTY_HOSTS_PROVIDER) {
|
||||
TestUnicastZenPing zenPingC = new TestUnicastZenPing(hostsSettingsMismatch, threadPool, handleC,
|
||||
EMPTY_HOSTS_PROVIDER) {
|
||||
@Override
|
||||
protected Version getVersion() {
|
||||
return versionD;
|
||||
|
@ -208,12 +224,13 @@ public class UnicastZenPingTests extends ESTestCase {
|
|||
|
||||
@Override
|
||||
public ClusterState clusterState() {
|
||||
return state;
|
||||
return stateMismatch;
|
||||
}
|
||||
});
|
||||
closeables.push(zenPingC);
|
||||
|
||||
UnicastZenPing zenPingD = new UnicastZenPing(hostsSettingsMismatch, threadPool, handleD.transportService, EMPTY_HOSTS_PROVIDER);
|
||||
TestUnicastZenPing zenPingD = new TestUnicastZenPing(hostsSettingsMismatch, threadPool, handleD,
|
||||
EMPTY_HOSTS_PROVIDER);
|
||||
zenPingD.start(new PingContextProvider() {
|
||||
@Override
|
||||
public DiscoveryNodes nodes() {
|
||||
|
@ -222,40 +239,48 @@ public class UnicastZenPingTests extends ESTestCase {
|
|||
|
||||
@Override
|
||||
public ClusterState clusterState() {
|
||||
return state;
|
||||
return stateMismatch;
|
||||
}
|
||||
});
|
||||
closeables.push(zenPingD);
|
||||
|
||||
logger.info("ping from UZP_A");
|
||||
Collection<ZenPing.PingResponse> pingResponses = zenPingA.pingAndWait(TimeValue.timeValueMillis(500));
|
||||
Collection<ZenPing.PingResponse> pingResponses = zenPingA.pingAndWait().toList();
|
||||
assertThat(pingResponses.size(), equalTo(1));
|
||||
ZenPing.PingResponse ping = pingResponses.iterator().next();
|
||||
assertThat(ping.node().getId(), equalTo("UZP_B"));
|
||||
assertThat(ping.getClusterStateVersion(), equalTo(state.version()));
|
||||
assertCountersMoreThan(handleA, handleB, handleC, handleD);
|
||||
assertPingCount(handleA, handleB, 3);
|
||||
assertPingCount(handleA, handleC, 0); // mismatch, shouldn't ping
|
||||
assertPingCount(handleA, handleD, 0); // mismatch, shouldn't ping
|
||||
|
||||
// ping again, this time from B,
|
||||
logger.info("ping from UZP_B");
|
||||
pingResponses = zenPingB.pingAndWait(TimeValue.timeValueMillis(500));
|
||||
pingResponses = zenPingB.pingAndWait().toList();
|
||||
assertThat(pingResponses.size(), equalTo(1));
|
||||
ping = pingResponses.iterator().next();
|
||||
assertThat(ping.node().getId(), equalTo("UZP_A"));
|
||||
assertThat(ping.getClusterStateVersion(), equalTo(ElectMasterService.MasterCandidate.UNRECOVERED_CLUSTER_VERSION));
|
||||
assertCountersMoreThan(handleB, handleA, handleC, handleD);
|
||||
assertPingCount(handleB, handleA, 3);
|
||||
assertPingCount(handleB, handleC, 0); // mismatch, shouldn't ping
|
||||
assertPingCount(handleB, handleD, 0); // mismatch, shouldn't ping
|
||||
|
||||
logger.info("ping from UZP_C");
|
||||
pingResponses = zenPingC.pingAndWait(TimeValue.timeValueMillis(500));
|
||||
assertThat(pingResponses.size(), equalTo(0));
|
||||
assertCountersMoreThan(handleC, handleA, handleB, handleD);
|
||||
pingResponses = zenPingC.pingAndWait().toList();
|
||||
assertThat(pingResponses.size(), equalTo(1));
|
||||
assertPingCount(handleC, handleA, 0);
|
||||
assertPingCount(handleC, handleB, 0);
|
||||
assertPingCount(handleC, handleD, 3);
|
||||
|
||||
logger.info("ping from UZP_D");
|
||||
pingResponses = zenPingD.pingAndWait(TimeValue.timeValueMillis(500));
|
||||
assertThat(pingResponses.size(), equalTo(0));
|
||||
assertCountersMoreThan(handleD, handleA, handleB, handleC);
|
||||
pingResponses = zenPingD.pingAndWait().toList();
|
||||
assertThat(pingResponses.size(), equalTo(1));
|
||||
assertPingCount(handleD, handleA, 0);
|
||||
assertPingCount(handleD, handleB, 0);
|
||||
assertPingCount(handleD, handleC, 3);
|
||||
}
|
||||
|
||||
public void testUnknownHostNotCached() {
|
||||
public void testUnknownHostNotCached() throws ExecutionException, InterruptedException {
|
||||
// use ephemeral ports
|
||||
final Settings settings = Settings.builder().put("cluster.name", "test").put(TransportSettings.PORT.getKey(), 0).build();
|
||||
|
||||
|
@ -306,7 +331,7 @@ public class UnicastZenPingTests extends ESTestCase {
|
|||
|
||||
final ClusterState state = ClusterState.builder(new ClusterName("test")).version(randomPositiveLong()).build();
|
||||
|
||||
final UnicastZenPing zenPingA = new UnicastZenPing(hostsSettings, threadPool, handleA.transportService, EMPTY_HOSTS_PROVIDER);
|
||||
final TestUnicastZenPing zenPingA = new TestUnicastZenPing(hostsSettings, threadPool, handleA, EMPTY_HOSTS_PROVIDER);
|
||||
zenPingA.start(new PingContextProvider() {
|
||||
@Override
|
||||
public DiscoveryNodes nodes() {
|
||||
|
@ -320,7 +345,7 @@ public class UnicastZenPingTests extends ESTestCase {
|
|||
});
|
||||
closeables.push(zenPingA);
|
||||
|
||||
UnicastZenPing zenPingB = new UnicastZenPing(hostsSettings, threadPool, handleB.transportService, EMPTY_HOSTS_PROVIDER);
|
||||
TestUnicastZenPing zenPingB = new TestUnicastZenPing(hostsSettings, threadPool, handleB, EMPTY_HOSTS_PROVIDER);
|
||||
zenPingB.start(new PingContextProvider() {
|
||||
@Override
|
||||
public DiscoveryNodes nodes() {
|
||||
|
@ -334,7 +359,7 @@ public class UnicastZenPingTests extends ESTestCase {
|
|||
});
|
||||
closeables.push(zenPingB);
|
||||
|
||||
UnicastZenPing zenPingC = new UnicastZenPing(hostsSettings, threadPool, handleC.transportService, EMPTY_HOSTS_PROVIDER);
|
||||
TestUnicastZenPing zenPingC = new TestUnicastZenPing(hostsSettings, threadPool, handleC, EMPTY_HOSTS_PROVIDER);
|
||||
zenPingC.start(new PingContextProvider() {
|
||||
@Override
|
||||
public DiscoveryNodes nodes() {
|
||||
|
@ -350,12 +375,13 @@ public class UnicastZenPingTests extends ESTestCase {
|
|||
|
||||
// the presence of an unresolvable host should not prevent resolvable hosts from being pinged
|
||||
{
|
||||
final Collection<ZenPing.PingResponse> pingResponses = zenPingA.pingAndWait(TimeValue.timeValueMillis(500));
|
||||
final Collection<ZenPing.PingResponse> pingResponses = zenPingA.pingAndWait().toList();
|
||||
assertThat(pingResponses.size(), equalTo(1));
|
||||
ZenPing.PingResponse ping = pingResponses.iterator().next();
|
||||
assertThat(ping.node().getId(), equalTo("UZP_C"));
|
||||
assertThat(ping.getClusterStateVersion(), equalTo(state.version()));
|
||||
assertCountersMoreThan(handleA, handleC);
|
||||
assertPingCount(handleA, handleB, 0);
|
||||
assertPingCount(handleA, handleC, 3);
|
||||
assertNull(handleA.counters.get(handleB.address));
|
||||
}
|
||||
|
||||
|
@ -373,11 +399,13 @@ public class UnicastZenPingTests extends ESTestCase {
|
|||
|
||||
// now we should see pings to UZP_B; this establishes that host resolutions are not cached
|
||||
{
|
||||
final Collection<ZenPing.PingResponse> secondPingResponses = zenPingA.pingAndWait(TimeValue.timeValueMillis(500));
|
||||
handleA.counters.clear();
|
||||
final Collection<ZenPing.PingResponse> secondPingResponses = zenPingA.pingAndWait().toList();
|
||||
assertThat(secondPingResponses.size(), equalTo(2));
|
||||
final Set<String> ids = new HashSet<>(secondPingResponses.stream().map(p -> p.node().getId()).collect(Collectors.toList()));
|
||||
assertThat(ids, equalTo(new HashSet<>(Arrays.asList("UZP_B", "UZP_C"))));
|
||||
assertCountersMoreThan(moreThan, handleA, handleB, handleC);
|
||||
assertPingCount(handleA, handleB, 3);
|
||||
assertPingCount(handleA, handleC, 3);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -395,15 +423,14 @@ public class UnicastZenPingTests extends ESTestCase {
|
|||
final TransportService transportService =
|
||||
new TransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, null);
|
||||
closeables.push(transportService);
|
||||
final AtomicInteger idGenerator = new AtomicInteger();
|
||||
final int limitPortCounts = randomIntBetween(1, 10);
|
||||
final List<DiscoveryNode> discoveryNodes = UnicastZenPing.resolveDiscoveryNodes(
|
||||
final List<DiscoveryNode> discoveryNodes = TestUnicastZenPing.resolveHostsLists(
|
||||
executorService,
|
||||
logger,
|
||||
Collections.singletonList("127.0.0.1"),
|
||||
limitPortCounts,
|
||||
transportService,
|
||||
() -> Integer.toString(idGenerator.incrementAndGet()),
|
||||
"test_",
|
||||
TimeValue.timeValueSeconds(1));
|
||||
assertThat(discoveryNodes, hasSize(limitPortCounts));
|
||||
final Set<Integer> ports = new HashSet<>();
|
||||
|
@ -439,15 +466,14 @@ public class UnicastZenPingTests extends ESTestCase {
|
|||
final TransportService transportService =
|
||||
new TransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, null);
|
||||
closeables.push(transportService);
|
||||
final AtomicInteger idGenerator = new AtomicInteger();
|
||||
|
||||
final List<DiscoveryNode> discoveryNodes = UnicastZenPing.resolveDiscoveryNodes(
|
||||
final List<DiscoveryNode> discoveryNodes = TestUnicastZenPing.resolveHostsLists(
|
||||
executorService,
|
||||
logger,
|
||||
Arrays.asList(hostname),
|
||||
1,
|
||||
transportService,
|
||||
() -> Integer.toString(idGenerator.incrementAndGet()),
|
||||
"test_",
|
||||
TimeValue.timeValueSeconds(1)
|
||||
);
|
||||
|
||||
|
@ -490,16 +516,15 @@ public class UnicastZenPingTests extends ESTestCase {
|
|||
final TransportService transportService =
|
||||
new TransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, null);
|
||||
closeables.push(transportService);
|
||||
final AtomicInteger idGenerator = new AtomicInteger();
|
||||
final TimeValue resolveTimeout = TimeValue.timeValueSeconds(randomIntBetween(1, 3));
|
||||
try {
|
||||
final List<DiscoveryNode> discoveryNodes = UnicastZenPing.resolveDiscoveryNodes(
|
||||
final List<DiscoveryNode> discoveryNodes = TestUnicastZenPing.resolveHostsLists(
|
||||
executorService,
|
||||
logger,
|
||||
Arrays.asList("hostname1", "hostname2"),
|
||||
1,
|
||||
transportService,
|
||||
() -> Integer.toString(idGenerator.incrementAndGet()),
|
||||
"test+",
|
||||
resolveTimeout);
|
||||
|
||||
assertThat(discoveryNodes, hasSize(1));
|
||||
|
@ -513,6 +538,156 @@ public class UnicastZenPingTests extends ESTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
public void testResolveReuseExistingNodeConnections() throws ExecutionException, InterruptedException {
|
||||
final Settings settings = Settings.builder().put("cluster.name", "test").put(TransportSettings.PORT.getKey(), 0).build();
|
||||
|
||||
NetworkService networkService = new NetworkService(settings, Collections.emptyList());
|
||||
|
||||
final BiFunction<Settings, Version, Transport> supplier = (s, v) -> new MockTcpTransport(
|
||||
s,
|
||||
threadPool,
|
||||
BigArrays.NON_RECYCLING_INSTANCE,
|
||||
new NoneCircuitBreakerService(),
|
||||
new NamedWriteableRegistry(Collections.emptyList()),
|
||||
networkService,
|
||||
v);
|
||||
|
||||
NetworkHandle handleA = startServices(settings, threadPool, "UZP_A", Version.CURRENT, supplier, EnumSet.allOf(Role.class));
|
||||
closeables.push(handleA.transportService);
|
||||
NetworkHandle handleB = startServices(settings, threadPool, "UZP_B", Version.CURRENT, supplier, EnumSet.allOf(Role.class));
|
||||
closeables.push(handleB.transportService);
|
||||
|
||||
final boolean useHosts = randomBoolean();
|
||||
final Settings.Builder hostsSettingsBuilder = Settings.builder().put("cluster.name", "test");
|
||||
if (useHosts) {
|
||||
hostsSettingsBuilder.putArray("discovery.zen.ping.unicast.hosts",
|
||||
NetworkAddress.format(new InetSocketAddress(handleB.address.address().getAddress(), handleB.address.address().getPort()))
|
||||
);
|
||||
} else {
|
||||
hostsSettingsBuilder.put("discovery.zen.ping.unicast.hosts", (String) null);
|
||||
}
|
||||
final Settings hostsSettings = hostsSettingsBuilder.build();
|
||||
final ClusterState state = ClusterState.builder(new ClusterName("test")).version(randomPositiveLong()).build();
|
||||
|
||||
// connection to reuse
|
||||
handleA.transportService.connectToNode(handleB.node);
|
||||
|
||||
// install a listener to check that no new connections are made
|
||||
handleA.transportService.addConnectionListener(new TransportConnectionListener() {
|
||||
@Override
|
||||
public void onConnectionOpened(DiscoveryNode node) {
|
||||
fail("should not open any connections. got [" + node + "]");
|
||||
}
|
||||
});
|
||||
|
||||
final TestUnicastZenPing zenPingA = new TestUnicastZenPing(hostsSettings, threadPool, handleA, EMPTY_HOSTS_PROVIDER);
|
||||
zenPingA.start(new PingContextProvider() {
|
||||
@Override
|
||||
public DiscoveryNodes nodes() {
|
||||
return DiscoveryNodes.builder().add(handleA.node).add(handleB.node).localNodeId("UZP_A").build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClusterState clusterState() {
|
||||
return ClusterState.builder(state).blocks(ClusterBlocks.builder().addGlobalBlock(STATE_NOT_RECOVERED_BLOCK)).build();
|
||||
}
|
||||
});
|
||||
closeables.push(zenPingA);
|
||||
|
||||
TestUnicastZenPing zenPingB = new TestUnicastZenPing(hostsSettings, threadPool, handleB, EMPTY_HOSTS_PROVIDER);
|
||||
zenPingB.start(new PingContextProvider() {
|
||||
@Override
|
||||
public DiscoveryNodes nodes() {
|
||||
return DiscoveryNodes.builder().add(handleB.node).localNodeId("UZP_B").build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClusterState clusterState() {
|
||||
return state;
|
||||
}
|
||||
});
|
||||
closeables.push(zenPingB);
|
||||
|
||||
Collection<ZenPing.PingResponse> pingResponses = zenPingA.pingAndWait().toList();
|
||||
assertThat(pingResponses.size(), equalTo(1));
|
||||
ZenPing.PingResponse ping = pingResponses.iterator().next();
|
||||
assertThat(ping.node().getId(), equalTo("UZP_B"));
|
||||
assertThat(ping.getClusterStateVersion(), equalTo(state.version()));
|
||||
|
||||
}
|
||||
|
||||
public void testPingingTemporalPings() throws ExecutionException, InterruptedException {
|
||||
final Settings settings = Settings.builder().put("cluster.name", "test").put(TransportSettings.PORT.getKey(), 0).build();
|
||||
|
||||
NetworkService networkService = new NetworkService(settings, Collections.emptyList());
|
||||
|
||||
final BiFunction<Settings, Version, Transport> supplier = (s, v) -> new MockTcpTransport(
|
||||
s,
|
||||
threadPool,
|
||||
BigArrays.NON_RECYCLING_INSTANCE,
|
||||
new NoneCircuitBreakerService(),
|
||||
new NamedWriteableRegistry(Collections.emptyList()),
|
||||
networkService,
|
||||
v);
|
||||
|
||||
NetworkHandle handleA = startServices(settings, threadPool, "UZP_A", Version.CURRENT, supplier, EnumSet.allOf(Role.class));
|
||||
closeables.push(handleA.transportService);
|
||||
NetworkHandle handleB = startServices(settings, threadPool, "UZP_B", Version.CURRENT, supplier, EnumSet.allOf(Role.class));
|
||||
closeables.push(handleB.transportService);
|
||||
|
||||
final Settings hostsSettings = Settings.builder()
|
||||
.put("cluster.name", "test")
|
||||
.put("discovery.zen.ping.unicast.hosts", (String) null) // use nodes for simplicity
|
||||
.build();
|
||||
final ClusterState state = ClusterState.builder(new ClusterName("test")).version(randomPositiveLong()).build();
|
||||
|
||||
final TestUnicastZenPing zenPingA = new TestUnicastZenPing(hostsSettings, threadPool, handleA, EMPTY_HOSTS_PROVIDER);
|
||||
zenPingA.start(new PingContextProvider() {
|
||||
@Override
|
||||
public DiscoveryNodes nodes() {
|
||||
return DiscoveryNodes.builder().add(handleA.node).add(handleB.node).localNodeId("UZP_A").build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClusterState clusterState() {
|
||||
return ClusterState.builder(state).blocks(ClusterBlocks.builder().addGlobalBlock(STATE_NOT_RECOVERED_BLOCK)).build();
|
||||
}
|
||||
});
|
||||
closeables.push(zenPingA);
|
||||
|
||||
// Node B doesn't know about A!
|
||||
TestUnicastZenPing zenPingB = new TestUnicastZenPing(hostsSettings, threadPool, handleB, EMPTY_HOSTS_PROVIDER);
|
||||
zenPingB.start(new PingContextProvider() {
|
||||
@Override
|
||||
public DiscoveryNodes nodes() {
|
||||
return DiscoveryNodes.builder().add(handleB.node).localNodeId("UZP_B").build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClusterState clusterState() {
|
||||
return state;
|
||||
}
|
||||
});
|
||||
closeables.push(zenPingB);
|
||||
|
||||
{
|
||||
logger.info("pinging from UZP_A so UZP_B will learn about it");
|
||||
Collection<ZenPing.PingResponse> pingResponses = zenPingA.pingAndWait().toList();
|
||||
assertThat(pingResponses.size(), equalTo(1));
|
||||
ZenPing.PingResponse ping = pingResponses.iterator().next();
|
||||
assertThat(ping.node().getId(), equalTo("UZP_B"));
|
||||
assertThat(ping.getClusterStateVersion(), equalTo(state.version()));
|
||||
}
|
||||
{
|
||||
logger.info("pinging from UZP_B");
|
||||
Collection<ZenPing.PingResponse> pingResponses = zenPingB.pingAndWait().toList();
|
||||
assertThat(pingResponses.size(), equalTo(1));
|
||||
ZenPing.PingResponse ping = pingResponses.iterator().next();
|
||||
assertThat(ping.node().getId(), equalTo("UZP_A"));
|
||||
assertThat(ping.getClusterStateVersion(), equalTo(-1L)); // A has a block
|
||||
}
|
||||
}
|
||||
|
||||
public void testInvalidHosts() throws InterruptedException {
|
||||
final Logger logger = mock(Logger.class);
|
||||
final NetworkService networkService = new NetworkService(Settings.EMPTY, Collections.emptyList());
|
||||
|
@ -529,14 +704,13 @@ public class UnicastZenPingTests extends ESTestCase {
|
|||
final TransportService transportService =
|
||||
new TransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, null);
|
||||
closeables.push(transportService);
|
||||
final AtomicInteger idGenerator = new AtomicInteger();
|
||||
final List<DiscoveryNode> discoveryNodes = UnicastZenPing.resolveDiscoveryNodes(
|
||||
final List<DiscoveryNode> discoveryNodes = TestUnicastZenPing.resolveHostsLists(
|
||||
executorService,
|
||||
logger,
|
||||
Arrays.asList("127.0.0.1:9300:9300", "127.0.0.1:9301"),
|
||||
1,
|
||||
transportService,
|
||||
() -> Integer.toString(idGenerator.incrementAndGet()),
|
||||
"test_",
|
||||
TimeValue.timeValueSeconds(1));
|
||||
assertThat(discoveryNodes, hasSize(1)); // only one of the two is valid and will be used
|
||||
assertThat(discoveryNodes.get(0).getAddress().getAddress(), equalTo("127.0.0.1"));
|
||||
|
@ -544,24 +718,13 @@ public class UnicastZenPingTests extends ESTestCase {
|
|||
verify(logger).warn(eq("failed to resolve host [127.0.0.1:9300:9300]"), Matchers.any(ExecutionException.class));
|
||||
}
|
||||
|
||||
// assert that we tried to ping each of the configured nodes at least once
|
||||
private void assertCountersMoreThan(final NetworkHandle that, final NetworkHandle...handles) {
|
||||
final HashMap<TransportAddress, Integer> moreThan = new HashMap<>();
|
||||
for (final NetworkHandle handle : handles) {
|
||||
assert handle != that;
|
||||
moreThan.put(handle.address, 0);
|
||||
}
|
||||
assertCountersMoreThan(moreThan, that, handles);
|
||||
}
|
||||
|
||||
private void assertCountersMoreThan(
|
||||
final Map<TransportAddress, Integer> moreThan,
|
||||
final NetworkHandle that,
|
||||
final NetworkHandle... handles) {
|
||||
for (final NetworkHandle handle : handles) {
|
||||
assert handle != that;
|
||||
assertThat(that.counters.get(handle.address).get(), greaterThan(moreThan.get(handle.address)));
|
||||
}
|
||||
private void assertPingCount(final NetworkHandle fromNode, final NetworkHandle toNode, int expectedCount) {
|
||||
final AtomicInteger counter = fromNode.counters.getOrDefault(toNode.address, new AtomicInteger());
|
||||
final String onNodeName = fromNode.node.getName();
|
||||
assertNotNull("handle for [" + onNodeName + "] has no 'expected' counter", counter);
|
||||
final String forNodeName = toNode.node.getName();
|
||||
assertThat("node [" + onNodeName + "] ping count to [" + forNodeName + "] is unexpected",
|
||||
counter.get(), equalTo(expectedCount));
|
||||
}
|
||||
|
||||
private NetworkHandle startServices(
|
||||
|
@ -570,31 +733,36 @@ public class UnicastZenPingTests extends ESTestCase {
|
|||
final String nodeId,
|
||||
final Version version,
|
||||
final BiFunction<Settings, Version, Transport> supplier) {
|
||||
final Transport transport = supplier.apply(settings, version);
|
||||
final TransportService transportService =
|
||||
new MockTransportService(settings, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, null);
|
||||
return startServices(settings, threadPool, nodeId, version, supplier, emptySet());
|
||||
|
||||
}
|
||||
|
||||
private NetworkHandle startServices(
|
||||
final Settings settings,
|
||||
final ThreadPool threadPool,
|
||||
final String nodeId,
|
||||
final Version version,
|
||||
final BiFunction<Settings, Version, Transport> supplier,
|
||||
final Set<Role> nodeRoles) {
|
||||
final Settings nodeSettings = Settings.builder().put(settings)
|
||||
.put("node.name", nodeId)
|
||||
.put(TransportService.TRACE_LOG_INCLUDE_SETTING.getKey(), "internal:discovery/zen/unicast")
|
||||
.build();
|
||||
final Transport transport = supplier.apply(nodeSettings, version);
|
||||
final MockTransportService transportService =
|
||||
new MockTransportService(nodeSettings, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, null);
|
||||
transportService.start();
|
||||
transportService.acceptIncomingRequests();
|
||||
final ConcurrentMap<TransportAddress, AtomicInteger> counters = ConcurrentCollections.newConcurrentMap();
|
||||
transportService.addConnectionListener(new TransportConnectionListener() {
|
||||
|
||||
transportService.addTracer(new MockTransportService.Tracer() {
|
||||
@Override
|
||||
public void onNodeConnected(DiscoveryNode node) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onConnectionOpened(DiscoveryNode node) {
|
||||
public void requestSent(DiscoveryNode node, long requestId, String action, TransportRequestOptions options) {
|
||||
counters.computeIfAbsent(node.getAddress(), k -> new AtomicInteger());
|
||||
counters.get(node.getAddress()).incrementAndGet();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onNodeDisconnected(DiscoveryNode node) {
|
||||
}
|
||||
|
||||
});
|
||||
final DiscoveryNode node =
|
||||
new DiscoveryNode(nodeId, transportService.boundAddress().publishAddress(), emptyMap(), emptySet(), version);
|
||||
new DiscoveryNode(nodeId, nodeId, transportService.boundAddress().publishAddress(), emptyMap(), nodeRoles, version);
|
||||
transportService.setLocalNode(node);
|
||||
return new NetworkHandle(transport.boundAddress().publishAddress(), transportService, node, counters);
|
||||
}
|
||||
|
@ -616,7 +784,123 @@ public class UnicastZenPingTests extends ESTestCase {
|
|||
this.node = discoveryNode;
|
||||
this.counters = counters;
|
||||
}
|
||||
}
|
||||
|
||||
private static class TestUnicastZenPing extends UnicastZenPing {
|
||||
|
||||
public TestUnicastZenPing(Settings settings, ThreadPool threadPool, NetworkHandle networkHandle,
|
||||
UnicastHostsProvider unicastHostsProvider) {
|
||||
super(Settings.builder().put("node.name", networkHandle.node.getName()).put(settings).build(),
|
||||
threadPool, networkHandle.transportService, unicastHostsProvider);
|
||||
}
|
||||
|
||||
volatile CountDownLatch allTasksCompleted;
|
||||
volatile AtomicInteger pendingTasks;
|
||||
|
||||
PingCollection pingAndWait() throws ExecutionException, InterruptedException {
|
||||
allTasksCompleted = new CountDownLatch(1);
|
||||
pendingTasks = new AtomicInteger();
|
||||
// make the three sending rounds to come as started
|
||||
markTaskAsStarted("send pings");
|
||||
markTaskAsStarted("send pings");
|
||||
markTaskAsStarted("send pings");
|
||||
final CompletableFuture<PingCollection> response = new CompletableFuture<>();
|
||||
try {
|
||||
ping(response::complete, TimeValue.timeValueMillis(1), TimeValue.timeValueSeconds(1));
|
||||
} catch (Exception ex) {
|
||||
response.completeExceptionally(ex);
|
||||
}
|
||||
return response.get();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void finishPingingRound(PingingRound pingingRound) {
|
||||
// wait for all activity to finish before closing
|
||||
try {
|
||||
allTasksCompleted.await();
|
||||
} catch (InterruptedException e) {
|
||||
// ok, finish anyway
|
||||
}
|
||||
super.finishPingingRound(pingingRound);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void sendPings(TimeValue timeout, PingingRound pingingRound) {
|
||||
super.sendPings(timeout, pingingRound);
|
||||
markTaskAsCompleted("send pings");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void submitToExecutor(AbstractRunnable abstractRunnable) {
|
||||
markTaskAsStarted("executor runnable");
|
||||
super.submitToExecutor(new AbstractRunnable() {
|
||||
@Override
|
||||
public void onRejection(Exception e) {
|
||||
try {
|
||||
super.onRejection(e);
|
||||
} finally {
|
||||
markTaskAsCompleted("executor runnable (rejected)");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onAfter() {
|
||||
markTaskAsCompleted("executor runnable");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doRun() throws Exception {
|
||||
abstractRunnable.run();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
// we shouldn't really end up here.
|
||||
throw new AssertionError("unexpected error", e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private void markTaskAsStarted(String task) {
|
||||
logger.trace("task [{}] started. count [{}]", task, pendingTasks.incrementAndGet());
|
||||
}
|
||||
|
||||
private void markTaskAsCompleted(String task) {
|
||||
final int left = pendingTasks.decrementAndGet();
|
||||
logger.trace("task [{}] completed. count [{}]", task, left);
|
||||
if (left == 0) {
|
||||
allTasksCompleted.countDown();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected TransportResponseHandler<UnicastPingResponse> getPingResponseHandler(PingingRound pingingRound, DiscoveryNode node) {
|
||||
markTaskAsStarted("ping [" + node + "]");
|
||||
TransportResponseHandler<UnicastPingResponse> original = super.getPingResponseHandler(pingingRound, node);
|
||||
return new TransportResponseHandler<UnicastPingResponse>() {
|
||||
@Override
|
||||
public UnicastPingResponse newInstance() {
|
||||
return original.newInstance();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleResponse(UnicastPingResponse response) {
|
||||
original.handleResponse(response);
|
||||
markTaskAsCompleted("ping [" + node + "]");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleException(TransportException exp) {
|
||||
original.handleException(exp);
|
||||
markTaskAsCompleted("ping [" + node + "] (error)");
|
||||
}
|
||||
|
||||
@Override
|
||||
public String executor() {
|
||||
return original.executor();
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -68,7 +68,7 @@ public class ZenPingTests extends ESTestCase {
|
|||
Collections.shuffle(pings, random());
|
||||
|
||||
ZenPing.PingCollection collection = new ZenPing.PingCollection();
|
||||
collection.addPings(pings);
|
||||
pings.forEach(collection::addPing);
|
||||
|
||||
List<ZenPing.PingResponse> aggregate = collection.toList();
|
||||
|
||||
|
|
|
@ -208,8 +208,8 @@ public class TCPTransportTests extends ESTestCase {
|
|||
|
||||
@Override
|
||||
public NodeChannels getConnection(DiscoveryNode node) {
|
||||
return new NodeChannels(node, new Object[ConnectionProfile.LIGHT_PROFILE.getNumConnections()],
|
||||
ConnectionProfile.LIGHT_PROFILE);
|
||||
return new NodeChannels(node, new Object[MockTcpTransport.LIGHT_PROFILE.getNumConnections()],
|
||||
MockTcpTransport.LIGHT_PROFILE);
|
||||
}
|
||||
};
|
||||
DiscoveryNode node = new DiscoveryNode("foo", buildNewFakeTransportAddress(), Version.CURRENT);
|
||||
|
|
|
@ -113,7 +113,7 @@ public class TransportServiceHandshakeTests extends ESTestCase {
|
|||
emptyMap(),
|
||||
emptySet(),
|
||||
Version.CURRENT.minimumCompatibilityVersion());
|
||||
try (Transport.Connection connection = handleA.transportService.openConnection(discoveryNode, ConnectionProfile.LIGHT_PROFILE)){
|
||||
try (Transport.Connection connection = handleA.transportService.openConnection(discoveryNode, MockTcpTransport.LIGHT_PROFILE)){
|
||||
DiscoveryNode connectedNode = handleA.transportService.handshake(connection, timeout);
|
||||
assertNotNull(connectedNode);
|
||||
// the name and version should be updated
|
||||
|
@ -121,16 +121,6 @@ public class TransportServiceHandshakeTests extends ESTestCase {
|
|||
assertEquals(connectedNode.getVersion(), handleB.discoveryNode.getVersion());
|
||||
assertFalse(handleA.transportService.nodeConnected(discoveryNode));
|
||||
}
|
||||
|
||||
DiscoveryNode connectedNode =
|
||||
handleA.transportService.connectToNodeAndHandshake(discoveryNode, timeout);
|
||||
assertNotNull(connectedNode);
|
||||
|
||||
// the name and version should be updated
|
||||
assertEquals(connectedNode.getName(), "TS_B");
|
||||
assertEquals(connectedNode.getVersion(), handleB.discoveryNode.getVersion());
|
||||
assertTrue(handleA.transportService.nodeConnected(discoveryNode));
|
||||
|
||||
}
|
||||
|
||||
public void testMismatchedClusterName() {
|
||||
|
@ -145,7 +135,7 @@ public class TransportServiceHandshakeTests extends ESTestCase {
|
|||
Version.CURRENT.minimumCompatibilityVersion());
|
||||
IllegalStateException ex = expectThrows(IllegalStateException.class, () -> {
|
||||
try (Transport.Connection connection = handleA.transportService.openConnection(discoveryNode,
|
||||
ConnectionProfile.LIGHT_PROFILE)) {
|
||||
MockTcpTransport.LIGHT_PROFILE)) {
|
||||
handleA.transportService.handshake(connection, timeout);
|
||||
}
|
||||
});
|
||||
|
@ -166,7 +156,7 @@ public class TransportServiceHandshakeTests extends ESTestCase {
|
|||
Version.CURRENT.minimumCompatibilityVersion());
|
||||
IllegalStateException ex = expectThrows(IllegalStateException.class, () -> {
|
||||
try (Transport.Connection connection = handleA.transportService.openConnection(discoveryNode,
|
||||
ConnectionProfile.LIGHT_PROFILE)) {
|
||||
MockTcpTransport.LIGHT_PROFILE)) {
|
||||
handleA.transportService.handshake(connection, timeout);
|
||||
}
|
||||
});
|
||||
|
|
|
@ -43,7 +43,7 @@ import java.util.stream.Collectors;
|
|||
import java.util.stream.Stream;
|
||||
|
||||
import static org.elasticsearch.discovery.zen.UnicastZenPing.DISCOVERY_ZEN_PING_UNICAST_HOSTS_RESOLVE_TIMEOUT;
|
||||
import static org.elasticsearch.discovery.zen.UnicastZenPing.resolveDiscoveryNodes;
|
||||
import static org.elasticsearch.discovery.zen.UnicastZenPing.resolveHostsLists;
|
||||
|
||||
/**
|
||||
* An implementation of {@link UnicastHostsProvider} that reads hosts/ports
|
||||
|
@ -97,13 +97,13 @@ class FileBasedUnicastHostsProvider extends AbstractComponent implements Unicast
|
|||
|
||||
final List<DiscoveryNode> discoNodes = new ArrayList<>();
|
||||
try {
|
||||
discoNodes.addAll(resolveDiscoveryNodes(
|
||||
discoNodes.addAll(resolveHostsLists(
|
||||
executorService,
|
||||
logger,
|
||||
hostsList,
|
||||
1,
|
||||
transportService,
|
||||
() -> UNICAST_HOST_PREFIX + nodeIdGenerator.incrementAndGet() + "#",
|
||||
UNICAST_HOST_PREFIX,
|
||||
resolveTimeout));
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
|
|
|
@ -33,9 +33,7 @@ import org.elasticsearch.threadpool.ThreadPool;
|
|||
import org.elasticsearch.transport.MockTcpTransport;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
|
||||
import java.io.BufferedWriter;
|
||||
import java.io.IOException;
|
||||
|
@ -44,7 +42,6 @@ import java.nio.file.Path;
|
|||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
|
||||
|
@ -99,13 +96,13 @@ public class FileBasedUnicastHostsProviderTests extends ESTestCase {
|
|||
assertEquals(hostEntries.size() - 1, nodes.size()); // minus 1 because we are ignoring the first line that's a comment
|
||||
assertEquals("192.168.0.1", nodes.get(0).getAddress().getAddress());
|
||||
assertEquals(9300, nodes.get(0).getAddress().getPort());
|
||||
assertEquals(UNICAST_HOST_PREFIX + "1#", nodes.get(0).getId());
|
||||
assertEquals(UNICAST_HOST_PREFIX + "192.168.0.1_0#", nodes.get(0).getId());
|
||||
assertEquals("192.168.0.2", nodes.get(1).getAddress().getAddress());
|
||||
assertEquals(9305, nodes.get(1).getAddress().getPort());
|
||||
assertEquals(UNICAST_HOST_PREFIX + "2#", nodes.get(1).getId());
|
||||
assertEquals(UNICAST_HOST_PREFIX + "192.168.0.2:9305_0#", nodes.get(1).getId());
|
||||
assertEquals("255.255.23.15", nodes.get(2).getAddress().getAddress());
|
||||
assertEquals(9300, nodes.get(2).getAddress().getPort());
|
||||
assertEquals(UNICAST_HOST_PREFIX + "3#", nodes.get(2).getId());
|
||||
assertEquals(UNICAST_HOST_PREFIX + "255.255.23.15_0#", nodes.get(2).getId());
|
||||
}
|
||||
|
||||
public void testEmptyUnicastHostsFile() throws Exception {
|
||||
|
|
|
@ -28,10 +28,9 @@ import org.elasticsearch.discovery.zen.PingContextProvider;
|
|||
import org.elasticsearch.discovery.zen.ZenPing;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
/**
|
||||
* A {@link ZenPing} implementation which returns results based on an static in-memory map. This allows pinging
|
||||
|
@ -62,7 +61,7 @@ public final class MockZenPing extends AbstractComponent implements ZenPing {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void ping(PingListener listener, TimeValue timeout) {
|
||||
public void ping(Consumer<PingCollection> resultsConsumer, TimeValue timeout) {
|
||||
logger.info("pinging using mock zen ping");
|
||||
synchronized (activeNodesPerCluster) {
|
||||
Set<MockZenPing> activeNodes = getActiveNodesForCurrentCluster();
|
||||
|
@ -76,11 +75,12 @@ public final class MockZenPing extends AbstractComponent implements ZenPing {
|
|||
activeNodes = getActiveNodesForCurrentCluster();
|
||||
}
|
||||
lastDiscoveredPings = activeNodes;
|
||||
List<PingResponse> responseList = activeNodes.stream()
|
||||
PingCollection pingCollection = new PingCollection();
|
||||
activeNodes.stream()
|
||||
.filter(p -> p != this) // remove this as pings are not expected to return the local node
|
||||
.map(MockZenPing::getPingResponse)
|
||||
.collect(Collectors.toList());
|
||||
listener.onPing(responseList);
|
||||
.forEach(pingCollection::addPing);
|
||||
resultsConsumer.accept(pingCollection);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -29,9 +29,7 @@ import org.elasticsearch.Version;
|
|||
import org.elasticsearch.action.ActionListenerResponseHandler;
|
||||
import org.elasticsearch.action.support.PlainActionFuture;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.io.stream.InputStreamStreamInput;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
import org.elasticsearch.common.io.stream.OutputStreamStreamOutput;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.logging.Loggers;
|
||||
|
@ -54,8 +52,6 @@ import org.junit.After;
|
|||
import org.junit.Before;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.io.UncheckedIOException;
|
||||
import java.net.InetAddress;
|
||||
import java.net.InetSocketAddress;
|
||||
|
@ -1358,7 +1354,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|||
// all is well
|
||||
}
|
||||
|
||||
try (Transport.Connection connection = serviceB.openConnection(nodeA, ConnectionProfile.LIGHT_PROFILE)){
|
||||
try (Transport.Connection connection = serviceB.openConnection(nodeA, MockTcpTransport.LIGHT_PROFILE)){
|
||||
serviceB.handshake(connection, 100);
|
||||
fail("exception should be thrown");
|
||||
} catch (IllegalStateException e) {
|
||||
|
@ -1416,7 +1412,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|||
// all is well
|
||||
}
|
||||
|
||||
try (Transport.Connection connection = serviceB.openConnection(nodeA, ConnectionProfile.LIGHT_PROFILE)){
|
||||
try (Transport.Connection connection = serviceB.openConnection(nodeA, MockTcpTransport.LIGHT_PROFILE)){
|
||||
serviceB.handshake(connection, 100);
|
||||
fail("exception should be thrown");
|
||||
} catch (IllegalStateException e) {
|
||||
|
|
|
@ -66,6 +66,23 @@ import java.util.function.Consumer;
|
|||
*/
|
||||
public class MockTcpTransport extends TcpTransport<MockTcpTransport.MockChannel> {
|
||||
|
||||
/**
|
||||
* A pre-built light connection profile that shares a single connection across all
|
||||
* types.
|
||||
*/
|
||||
public static final ConnectionProfile LIGHT_PROFILE;
|
||||
|
||||
static {
|
||||
ConnectionProfile.Builder builder = new ConnectionProfile.Builder();
|
||||
builder.addConnections(1,
|
||||
TransportRequestOptions.Type.BULK,
|
||||
TransportRequestOptions.Type.PING,
|
||||
TransportRequestOptions.Type.RECOVERY,
|
||||
TransportRequestOptions.Type.REG,
|
||||
TransportRequestOptions.Type.STATE);
|
||||
LIGHT_PROFILE = builder.build();
|
||||
}
|
||||
|
||||
private final ExecutorService executor;
|
||||
private final Version mockVersion;
|
||||
|
||||
|
@ -159,7 +176,7 @@ public class MockTcpTransport extends TcpTransport<MockTcpTransport.MockChannel>
|
|||
@Override
|
||||
protected NodeChannels connectToChannels(DiscoveryNode node, ConnectionProfile profile) throws IOException {
|
||||
final MockChannel[] mockChannels = new MockChannel[1];
|
||||
final NodeChannels nodeChannels = new NodeChannels(node, mockChannels, ConnectionProfile.LIGHT_PROFILE); // we always use light here
|
||||
final NodeChannels nodeChannels = new NodeChannels(node, mockChannels, LIGHT_PROFILE); // we always use light here
|
||||
boolean success = false;
|
||||
final Socket socket = new Socket();
|
||||
try {
|
||||
|
|
Loading…
Reference in New Issue