Extract a ConnectionManager interface (#51722)
Currently we have three different implementations representing a `ConnectionManager`. There is the basic `ConnectionManager` which holds all connections for a cluster. And a remote connection manager which support proxy behavior. And a stubbable connection manager for tests. The remote and stubbable instances use the delegate pattern, so this commit extracts an interface for them all to implement.
This commit is contained in:
parent
e752221fc6
commit
a742c58d45
|
@ -0,0 +1,281 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.transport;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.concurrent.AbstractRefCounted;
|
||||
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
||||
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
||||
import org.elasticsearch.common.util.concurrent.ListenableFuture;
|
||||
import org.elasticsearch.common.util.concurrent.RunOnce;
|
||||
import org.elasticsearch.core.internal.io.IOUtils;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
/**
|
||||
* This class manages node connections within a cluster. The connection is opened by the underlying transport.
|
||||
* Once the connection is opened, this class manages the connection. This includes closing the connection when
|
||||
* the connection manager is closed.
|
||||
*/
|
||||
public class ClusterConnectionManager implements ConnectionManager {
|
||||
|
||||
private static final Logger logger = LogManager.getLogger(ClusterConnectionManager.class);
|
||||
|
||||
private final ConcurrentMap<DiscoveryNode, Transport.Connection> connectedNodes = ConcurrentCollections.newConcurrentMap();
|
||||
private final ConcurrentMap<DiscoveryNode, ListenableFuture<Void>> pendingConnections = ConcurrentCollections.newConcurrentMap();
|
||||
private final AbstractRefCounted connectingRefCounter = new AbstractRefCounted("connection manager") {
|
||||
@Override
|
||||
protected void closeInternal() {
|
||||
Iterator<Map.Entry<DiscoveryNode, Transport.Connection>> iterator = connectedNodes.entrySet().iterator();
|
||||
while (iterator.hasNext()) {
|
||||
Map.Entry<DiscoveryNode, Transport.Connection> next = iterator.next();
|
||||
try {
|
||||
IOUtils.closeWhileHandlingException(next.getValue());
|
||||
} finally {
|
||||
iterator.remove();
|
||||
}
|
||||
}
|
||||
closeLatch.countDown();
|
||||
}
|
||||
};
|
||||
private final Transport transport;
|
||||
private final ConnectionProfile defaultProfile;
|
||||
private final AtomicBoolean closing = new AtomicBoolean(false);
|
||||
private final CountDownLatch closeLatch = new CountDownLatch(1);
|
||||
private final DelegatingNodeConnectionListener connectionListener = new DelegatingNodeConnectionListener();
|
||||
|
||||
public ClusterConnectionManager(Settings settings, Transport transport) {
|
||||
this(ConnectionProfile.buildDefaultConnectionProfile(settings), transport);
|
||||
}
|
||||
|
||||
public ClusterConnectionManager(ConnectionProfile connectionProfile, Transport transport) {
|
||||
this.transport = transport;
|
||||
this.defaultProfile = connectionProfile;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addListener(TransportConnectionListener listener) {
|
||||
this.connectionListener.addListener(listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removeListener(TransportConnectionListener listener) {
|
||||
this.connectionListener.removeListener(listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void openConnection(DiscoveryNode node, ConnectionProfile connectionProfile, ActionListener<Transport.Connection> listener) {
|
||||
ConnectionProfile resolvedProfile = ConnectionProfile.resolveConnectionProfile(connectionProfile, defaultProfile);
|
||||
internalOpenConnection(node, resolvedProfile, listener);
|
||||
}
|
||||
|
||||
/**
|
||||
* Connects to a node with the given connection profile. If the node is already connected this method has no effect.
|
||||
* Once a successful is established, it can be validated before being exposed.
|
||||
* The ActionListener will be called on the calling thread or the generic thread pool.
|
||||
*/
|
||||
@Override
|
||||
public void connectToNode(DiscoveryNode node, ConnectionProfile connectionProfile,
|
||||
ConnectionValidator connectionValidator,
|
||||
ActionListener<Void> listener) throws ConnectTransportException {
|
||||
ConnectionProfile resolvedProfile = ConnectionProfile.resolveConnectionProfile(connectionProfile, defaultProfile);
|
||||
if (node == null) {
|
||||
listener.onFailure(new ConnectTransportException(null, "can't connect to a null node"));
|
||||
return;
|
||||
}
|
||||
|
||||
if (connectingRefCounter.tryIncRef() == false) {
|
||||
listener.onFailure(new IllegalStateException("connection manager is closed"));
|
||||
return;
|
||||
}
|
||||
|
||||
if (connectedNodes.containsKey(node)) {
|
||||
connectingRefCounter.decRef();
|
||||
listener.onResponse(null);
|
||||
return;
|
||||
}
|
||||
|
||||
final ListenableFuture<Void> currentListener = new ListenableFuture<>();
|
||||
final ListenableFuture<Void> existingListener = pendingConnections.putIfAbsent(node, currentListener);
|
||||
if (existingListener != null) {
|
||||
try {
|
||||
// wait on previous entry to complete connection attempt
|
||||
existingListener.addListener(listener, EsExecutors.newDirectExecutorService());
|
||||
} finally {
|
||||
connectingRefCounter.decRef();
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
currentListener.addListener(listener, EsExecutors.newDirectExecutorService());
|
||||
|
||||
final RunOnce releaseOnce = new RunOnce(connectingRefCounter::decRef);
|
||||
internalOpenConnection(node, resolvedProfile, ActionListener.wrap(conn -> {
|
||||
connectionValidator.validate(conn, resolvedProfile, ActionListener.wrap(
|
||||
ignored -> {
|
||||
assert Transports.assertNotTransportThread("connection validator success");
|
||||
try {
|
||||
if (connectedNodes.putIfAbsent(node, conn) != null) {
|
||||
logger.debug("existing connection to node [{}], closing new redundant connection", node);
|
||||
IOUtils.closeWhileHandlingException(conn);
|
||||
} else {
|
||||
logger.debug("connected to node [{}]", node);
|
||||
try {
|
||||
connectionListener.onNodeConnected(node, conn);
|
||||
} finally {
|
||||
final Transport.Connection finalConnection = conn;
|
||||
conn.addCloseListener(ActionListener.wrap(() -> {
|
||||
logger.trace("unregistering {} after connection close and marking as disconnected", node);
|
||||
connectedNodes.remove(node, finalConnection);
|
||||
connectionListener.onNodeDisconnected(node, conn);
|
||||
}));
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
ListenableFuture<Void> future = pendingConnections.remove(node);
|
||||
assert future == currentListener : "Listener in pending map is different than the expected listener";
|
||||
releaseOnce.run();
|
||||
future.onResponse(null);
|
||||
}
|
||||
}, e -> {
|
||||
assert Transports.assertNotTransportThread("connection validator failure");
|
||||
IOUtils.closeWhileHandlingException(conn);
|
||||
failConnectionListeners(node, releaseOnce, e, currentListener);
|
||||
}));
|
||||
}, e -> {
|
||||
assert Transports.assertNotTransportThread("internalOpenConnection failure");
|
||||
failConnectionListeners(node, releaseOnce, e, currentListener);
|
||||
}));
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a connection for the given node if the node is connected.
|
||||
* Connections returned from this method must not be closed. The lifecycle of this connection is
|
||||
* maintained by this connection manager
|
||||
*
|
||||
* @throws NodeNotConnectedException if the node is not connected
|
||||
* @see #connectToNode(DiscoveryNode, ConnectionProfile, ConnectionValidator, ActionListener)
|
||||
*/
|
||||
@Override
|
||||
public Transport.Connection getConnection(DiscoveryNode node) {
|
||||
Transport.Connection connection = connectedNodes.get(node);
|
||||
if (connection == null) {
|
||||
throw new NodeNotConnectedException(node, "Node not connected");
|
||||
}
|
||||
return connection;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns {@code true} if the node is connected.
|
||||
*/
|
||||
@Override
|
||||
public boolean nodeConnected(DiscoveryNode node) {
|
||||
return connectedNodes.containsKey(node);
|
||||
}
|
||||
|
||||
/**
|
||||
* Disconnected from the given node, if not connected, will do nothing.
|
||||
*/
|
||||
@Override
|
||||
public void disconnectFromNode(DiscoveryNode node) {
|
||||
Transport.Connection nodeChannels = connectedNodes.remove(node);
|
||||
if (nodeChannels != null) {
|
||||
// if we found it and removed it we close
|
||||
nodeChannels.close();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the number of nodes this manager is connected to.
|
||||
*/
|
||||
@Override
|
||||
public int size() {
|
||||
return connectedNodes.size();
|
||||
}
|
||||
|
||||
public Set<DiscoveryNode> getAllConnectedNodes() {
|
||||
return Collections.unmodifiableSet(connectedNodes.keySet());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
internalClose(true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void closeNoBlock() {
|
||||
internalClose(false);
|
||||
}
|
||||
|
||||
private void internalClose(boolean waitForPendingConnections) {
|
||||
assert Transports.assertNotTransportThread("Closing ConnectionManager");
|
||||
if (closing.compareAndSet(false, true)) {
|
||||
connectingRefCounter.decRef();
|
||||
if (waitForPendingConnections) {
|
||||
try {
|
||||
closeLatch.await();
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
throw new IllegalStateException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void internalOpenConnection(DiscoveryNode node, ConnectionProfile connectionProfile,
|
||||
ActionListener<Transport.Connection> listener) {
|
||||
transport.openConnection(node, connectionProfile, ActionListener.map(listener, connection -> {
|
||||
assert Transports.assertNotTransportThread("internalOpenConnection success");
|
||||
try {
|
||||
connectionListener.onConnectionOpened(connection);
|
||||
} finally {
|
||||
connection.addCloseListener(ActionListener.wrap(() -> connectionListener.onConnectionClosed(connection)));
|
||||
}
|
||||
if (connection.isClosed()) {
|
||||
throw new ConnectTransportException(node, "a channel closed while connecting");
|
||||
}
|
||||
return connection;
|
||||
}));
|
||||
}
|
||||
|
||||
private void failConnectionListeners(DiscoveryNode node, RunOnce releaseOnce, Exception e, ListenableFuture<Void> expectedListener) {
|
||||
ListenableFuture<Void> future = pendingConnections.remove(node);
|
||||
releaseOnce.run();
|
||||
if (future != null) {
|
||||
assert future == expectedListener : "Listener in pending map is different than the expected listener";
|
||||
future.onFailure(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ConnectionProfile getConnectionProfile() {
|
||||
return defaultProfile;
|
||||
}
|
||||
|
||||
}
|
|
@ -16,266 +16,48 @@
|
|||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.transport;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.concurrent.AbstractRefCounted;
|
||||
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
||||
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
||||
import org.elasticsearch.common.util.concurrent.ListenableFuture;
|
||||
import org.elasticsearch.common.util.concurrent.RunOnce;
|
||||
import org.elasticsearch.core.internal.io.IOUtils;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.util.Collections;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
/**
|
||||
* This class manages node connections. The connection is opened by the underlying transport. Once the
|
||||
* connection is opened, this class manages the connection. This includes keep-alive pings and closing
|
||||
* the connection when the connection manager is closed.
|
||||
*/
|
||||
public class ConnectionManager implements Closeable {
|
||||
public interface ConnectionManager extends Closeable {
|
||||
|
||||
private static final Logger logger = LogManager.getLogger(ConnectionManager.class);
|
||||
void addListener(TransportConnectionListener listener);
|
||||
|
||||
private final ConcurrentMap<DiscoveryNode, Transport.Connection> connectedNodes = ConcurrentCollections.newConcurrentMap();
|
||||
private final ConcurrentMap<DiscoveryNode, ListenableFuture<Void>> pendingConnections = ConcurrentCollections.newConcurrentMap();
|
||||
private final AbstractRefCounted connectingRefCounter = new AbstractRefCounted("connection manager") {
|
||||
@Override
|
||||
protected void closeInternal() {
|
||||
Iterator<Map.Entry<DiscoveryNode, Transport.Connection>> iterator = connectedNodes.entrySet().iterator();
|
||||
while (iterator.hasNext()) {
|
||||
Map.Entry<DiscoveryNode, Transport.Connection> next = iterator.next();
|
||||
try {
|
||||
IOUtils.closeWhileHandlingException(next.getValue());
|
||||
} finally {
|
||||
iterator.remove();
|
||||
}
|
||||
}
|
||||
closeLatch.countDown();
|
||||
}
|
||||
};
|
||||
private final Transport transport;
|
||||
private final ConnectionProfile defaultProfile;
|
||||
private final AtomicBoolean closing = new AtomicBoolean(false);
|
||||
private final CountDownLatch closeLatch = new CountDownLatch(1);
|
||||
private final DelegatingNodeConnectionListener connectionListener = new DelegatingNodeConnectionListener();
|
||||
void removeListener(TransportConnectionListener listener);
|
||||
|
||||
public ConnectionManager(Settings settings, Transport transport) {
|
||||
this(ConnectionProfile.buildDefaultConnectionProfile(settings), transport);
|
||||
}
|
||||
void openConnection(DiscoveryNode node, ConnectionProfile connectionProfile, ActionListener<Transport.Connection> listener);
|
||||
|
||||
public ConnectionManager(ConnectionProfile connectionProfile, Transport transport) {
|
||||
this.transport = transport;
|
||||
this.defaultProfile = connectionProfile;
|
||||
}
|
||||
void connectToNode(DiscoveryNode node, ConnectionProfile connectionProfile,
|
||||
ConnectionValidator connectionValidator,
|
||||
ActionListener<Void> listener) throws ConnectTransportException;
|
||||
|
||||
public void addListener(TransportConnectionListener listener) {
|
||||
this.connectionListener.listeners.addIfAbsent(listener);
|
||||
}
|
||||
Transport.Connection getConnection(DiscoveryNode node);
|
||||
|
||||
public void removeListener(TransportConnectionListener listener) {
|
||||
this.connectionListener.listeners.remove(listener);
|
||||
}
|
||||
boolean nodeConnected(DiscoveryNode node);
|
||||
|
||||
public void openConnection(DiscoveryNode node, ConnectionProfile connectionProfile, ActionListener<Transport.Connection> listener) {
|
||||
ConnectionProfile resolvedProfile = ConnectionProfile.resolveConnectionProfile(connectionProfile, defaultProfile);
|
||||
internalOpenConnection(node, resolvedProfile, listener);
|
||||
}
|
||||
void disconnectFromNode(DiscoveryNode node);
|
||||
|
||||
int size();
|
||||
|
||||
@Override
|
||||
void close();
|
||||
|
||||
void closeNoBlock();
|
||||
|
||||
ConnectionProfile getConnectionProfile();
|
||||
|
||||
@FunctionalInterface
|
||||
public interface ConnectionValidator {
|
||||
interface ConnectionValidator {
|
||||
void validate(Transport.Connection connection, ConnectionProfile profile, ActionListener<Void> listener);
|
||||
}
|
||||
|
||||
/**
|
||||
* Connects to a node with the given connection profile. If the node is already connected this method has no effect.
|
||||
* Once a successful is established, it can be validated before being exposed.
|
||||
* The ActionListener will be called on the calling thread or the generic thread pool.
|
||||
*/
|
||||
public void connectToNode(DiscoveryNode node, ConnectionProfile connectionProfile,
|
||||
ConnectionValidator connectionValidator,
|
||||
ActionListener<Void> listener) throws ConnectTransportException {
|
||||
ConnectionProfile resolvedProfile = ConnectionProfile.resolveConnectionProfile(connectionProfile, defaultProfile);
|
||||
if (node == null) {
|
||||
listener.onFailure(new ConnectTransportException(null, "can't connect to a null node"));
|
||||
return;
|
||||
}
|
||||
|
||||
if (connectingRefCounter.tryIncRef() == false) {
|
||||
listener.onFailure(new IllegalStateException("connection manager is closed"));
|
||||
return;
|
||||
}
|
||||
|
||||
if (connectedNodes.containsKey(node)) {
|
||||
connectingRefCounter.decRef();
|
||||
listener.onResponse(null);
|
||||
return;
|
||||
}
|
||||
|
||||
final ListenableFuture<Void> currentListener = new ListenableFuture<>();
|
||||
final ListenableFuture<Void> existingListener = pendingConnections.putIfAbsent(node, currentListener);
|
||||
if (existingListener != null) {
|
||||
try {
|
||||
// wait on previous entry to complete connection attempt
|
||||
existingListener.addListener(listener, EsExecutors.newDirectExecutorService());
|
||||
} finally {
|
||||
connectingRefCounter.decRef();
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
currentListener.addListener(listener, EsExecutors.newDirectExecutorService());
|
||||
|
||||
final RunOnce releaseOnce = new RunOnce(connectingRefCounter::decRef);
|
||||
internalOpenConnection(node, resolvedProfile, ActionListener.wrap(conn -> {
|
||||
connectionValidator.validate(conn, resolvedProfile, ActionListener.wrap(
|
||||
ignored -> {
|
||||
assert Transports.assertNotTransportThread("connection validator success");
|
||||
try {
|
||||
if (connectedNodes.putIfAbsent(node, conn) != null) {
|
||||
logger.debug("existing connection to node [{}], closing new redundant connection", node);
|
||||
IOUtils.closeWhileHandlingException(conn);
|
||||
} else {
|
||||
logger.debug("connected to node [{}]", node);
|
||||
try {
|
||||
connectionListener.onNodeConnected(node, conn);
|
||||
} finally {
|
||||
final Transport.Connection finalConnection = conn;
|
||||
conn.addCloseListener(ActionListener.wrap(() -> {
|
||||
logger.trace("unregistering {} after connection close and marking as disconnected", node);
|
||||
connectedNodes.remove(node, finalConnection);
|
||||
connectionListener.onNodeDisconnected(node, conn);
|
||||
}));
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
ListenableFuture<Void> future = pendingConnections.remove(node);
|
||||
assert future == currentListener : "Listener in pending map is different than the expected listener";
|
||||
releaseOnce.run();
|
||||
future.onResponse(null);
|
||||
}
|
||||
}, e -> {
|
||||
assert Transports.assertNotTransportThread("connection validator failure");
|
||||
IOUtils.closeWhileHandlingException(conn);
|
||||
failConnectionListeners(node, releaseOnce, e, currentListener);
|
||||
}));
|
||||
}, e -> {
|
||||
assert Transports.assertNotTransportThread("internalOpenConnection failure");
|
||||
failConnectionListeners(node, releaseOnce, e, currentListener);
|
||||
}));
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a connection for the given node if the node is connected.
|
||||
* Connections returned from this method must not be closed. The lifecycle of this connection is
|
||||
* maintained by this connection manager
|
||||
*
|
||||
* @throws NodeNotConnectedException if the node is not connected
|
||||
* @see #connectToNode(DiscoveryNode, ConnectionProfile, ConnectionValidator, ActionListener)
|
||||
*/
|
||||
public Transport.Connection getConnection(DiscoveryNode node) {
|
||||
Transport.Connection connection = connectedNodes.get(node);
|
||||
if (connection == null) {
|
||||
throw new NodeNotConnectedException(node, "Node not connected");
|
||||
}
|
||||
return connection;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns {@code true} if the node is connected.
|
||||
*/
|
||||
public boolean nodeConnected(DiscoveryNode node) {
|
||||
return connectedNodes.containsKey(node);
|
||||
}
|
||||
|
||||
/**
|
||||
* Disconnected from the given node, if not connected, will do nothing.
|
||||
*/
|
||||
public void disconnectFromNode(DiscoveryNode node) {
|
||||
Transport.Connection nodeChannels = connectedNodes.remove(node);
|
||||
if (nodeChannels != null) {
|
||||
// if we found it and removed it we close
|
||||
nodeChannels.close();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the number of nodes this manager is connected to.
|
||||
*/
|
||||
public int size() {
|
||||
return connectedNodes.size();
|
||||
}
|
||||
|
||||
public Set<DiscoveryNode> getAllConnectedNodes() {
|
||||
return Collections.unmodifiableSet(connectedNodes.keySet());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
internalClose(true);
|
||||
}
|
||||
|
||||
public void closeNoBlock() {
|
||||
internalClose(false);
|
||||
}
|
||||
|
||||
private void internalClose(boolean waitForPendingConnections) {
|
||||
assert Transports.assertNotTransportThread("Closing ConnectionManager");
|
||||
if (closing.compareAndSet(false, true)) {
|
||||
connectingRefCounter.decRef();
|
||||
if (waitForPendingConnections) {
|
||||
try {
|
||||
closeLatch.await();
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
throw new IllegalStateException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void internalOpenConnection(DiscoveryNode node, ConnectionProfile connectionProfile,
|
||||
ActionListener<Transport.Connection> listener) {
|
||||
transport.openConnection(node, connectionProfile, ActionListener.map(listener, connection -> {
|
||||
assert Transports.assertNotTransportThread("internalOpenConnection success");
|
||||
try {
|
||||
connectionListener.onConnectionOpened(connection);
|
||||
} finally {
|
||||
connection.addCloseListener(ActionListener.wrap(() -> connectionListener.onConnectionClosed(connection)));
|
||||
}
|
||||
if (connection.isClosed()) {
|
||||
throw new ConnectTransportException(node, "a channel closed while connecting");
|
||||
}
|
||||
return connection;
|
||||
}));
|
||||
}
|
||||
|
||||
private void failConnectionListeners(DiscoveryNode node, RunOnce releaseOnce, Exception e, ListenableFuture<Void> expectedListener) {
|
||||
ListenableFuture<Void> future = pendingConnections.remove(node);
|
||||
releaseOnce.run();
|
||||
if (future != null) {
|
||||
assert future == expectedListener : "Listener in pending map is different than the expected listener";
|
||||
future.onFailure(e);
|
||||
}
|
||||
}
|
||||
|
||||
ConnectionProfile getConnectionProfile() {
|
||||
return defaultProfile;
|
||||
}
|
||||
|
||||
private static final class DelegatingNodeConnectionListener implements TransportConnectionListener {
|
||||
final class DelegatingNodeConnectionListener implements TransportConnectionListener {
|
||||
|
||||
private final CopyOnWriteArrayList<TransportConnectionListener> listeners = new CopyOnWriteArrayList<>();
|
||||
|
||||
|
@ -306,5 +88,13 @@ public class ConnectionManager implements Closeable {
|
|||
listener.onConnectionClosed(connection);
|
||||
}
|
||||
}
|
||||
|
||||
public void addListener(TransportConnectionListener listener) {
|
||||
listeners.addIfAbsent(listener);
|
||||
}
|
||||
|
||||
public void removeListener(TransportConnectionListener listener) {
|
||||
listeners.remove(listener);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -72,7 +72,7 @@ final class RemoteClusterConnection implements Closeable {
|
|||
this.remoteConnectionManager = new RemoteConnectionManager(clusterAlias, createConnectionManager(profile, transportService));
|
||||
this.connectionStrategy = RemoteConnectionStrategy.buildStrategy(clusterAlias, transportService, remoteConnectionManager, settings);
|
||||
// we register the transport service here as a listener to make sure we notify handlers on disconnect etc.
|
||||
this.remoteConnectionManager.getConnectionManager().addListener(transportService);
|
||||
this.remoteConnectionManager.addListener(transportService);
|
||||
this.skipUnavailable = RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE
|
||||
.getConcreteSettingForNamespace(clusterAlias).get(settings);
|
||||
this.threadPool = transportService.threadPool;
|
||||
|
@ -171,7 +171,7 @@ final class RemoteClusterConnection implements Closeable {
|
|||
* If such node is not connected, the returned connection will be a proxy connection that redirects to it.
|
||||
*/
|
||||
Transport.Connection getConnection(DiscoveryNode remoteClusterNode) {
|
||||
return remoteConnectionManager.getRemoteConnection(remoteClusterNode);
|
||||
return remoteConnectionManager.getConnection(remoteClusterNode);
|
||||
}
|
||||
|
||||
Transport.Connection getConnection() {
|
||||
|
@ -193,7 +193,7 @@ final class RemoteClusterConnection implements Closeable {
|
|||
}
|
||||
|
||||
boolean isNodeConnected(final DiscoveryNode node) {
|
||||
return remoteConnectionManager.getConnectionManager().nodeConnected(node);
|
||||
return remoteConnectionManager.nodeConnected(node);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -208,11 +208,11 @@ final class RemoteClusterConnection implements Closeable {
|
|||
}
|
||||
|
||||
private static ConnectionManager createConnectionManager(ConnectionProfile connectionProfile, TransportService transportService) {
|
||||
return new ConnectionManager(connectionProfile, transportService.transport);
|
||||
return new ClusterConnectionManager(connectionProfile, transportService.transport);
|
||||
}
|
||||
|
||||
ConnectionManager getConnectionManager() {
|
||||
return remoteConnectionManager.getConnectionManager();
|
||||
return remoteConnectionManager;
|
||||
}
|
||||
|
||||
boolean shouldRebuildConnection(Settings newSettings) {
|
||||
|
|
|
@ -22,24 +22,23 @@ import org.elasticsearch.Version;
|
|||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
public class RemoteConnectionManager implements Closeable {
|
||||
public class RemoteConnectionManager implements ConnectionManager {
|
||||
|
||||
private final String clusterAlias;
|
||||
private final ConnectionManager connectionManager;
|
||||
private final ConnectionManager delegate;
|
||||
private final AtomicLong counter = new AtomicLong();
|
||||
private volatile List<Transport.Connection> connections = Collections.emptyList();
|
||||
|
||||
RemoteConnectionManager(String clusterAlias, ConnectionManager connectionManager) {
|
||||
RemoteConnectionManager(String clusterAlias, ConnectionManager delegate) {
|
||||
this.clusterAlias = clusterAlias;
|
||||
this.connectionManager = connectionManager;
|
||||
this.connectionManager.addListener(new TransportConnectionListener() {
|
||||
this.delegate = delegate;
|
||||
this.delegate.addListener(new TransportConnectionListener() {
|
||||
@Override
|
||||
public void onNodeConnected(DiscoveryNode node, Transport.Connection connection) {
|
||||
addConnection(connection);
|
||||
|
@ -52,24 +51,52 @@ public class RemoteConnectionManager implements Closeable {
|
|||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public void connectToNode(DiscoveryNode node, ConnectionProfile connectionProfile,
|
||||
ConnectionManager.ConnectionValidator connectionValidator,
|
||||
ActionListener<Void> listener) throws ConnectTransportException {
|
||||
connectionManager.connectToNode(node, connectionProfile, connectionValidator, listener);
|
||||
delegate.connectToNode(node, connectionProfile, connectionValidator, listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addListener(TransportConnectionListener listener) {
|
||||
delegate.addListener(listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removeListener(TransportConnectionListener listener) {
|
||||
delegate.removeListener(listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void openConnection(DiscoveryNode node, ConnectionProfile profile, ActionListener<Transport.Connection> listener) {
|
||||
connectionManager.openConnection(node, profile, listener);
|
||||
delegate.openConnection(node, profile, listener);
|
||||
}
|
||||
|
||||
public Transport.Connection getRemoteConnection(DiscoveryNode node) {
|
||||
@Override
|
||||
public Transport.Connection getConnection(DiscoveryNode node) {
|
||||
try {
|
||||
return connectionManager.getConnection(node);
|
||||
return delegate.getConnection(node);
|
||||
} catch (NodeNotConnectedException e) {
|
||||
return new ProxyConnection(getAnyRemoteConnection(), node);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean nodeConnected(DiscoveryNode node) {
|
||||
return delegate.nodeConnected(node);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void disconnectFromNode(DiscoveryNode node) {
|
||||
delegate.disconnectFromNode(node);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ConnectionProfile getConnectionProfile() {
|
||||
return delegate.getConnectionProfile();
|
||||
}
|
||||
|
||||
public Transport.Connection getAnyRemoteConnection() {
|
||||
List<Transport.Connection> localConnections = this.connections;
|
||||
if (localConnections.isEmpty()) {
|
||||
|
@ -81,16 +108,19 @@ public class RemoteConnectionManager implements Closeable {
|
|||
}
|
||||
}
|
||||
|
||||
public ConnectionManager getConnectionManager() {
|
||||
return connectionManager;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int size() {
|
||||
return connectionManager.size();
|
||||
return delegate.size();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
connectionManager.closeNoBlock();
|
||||
delegate.closeNoBlock();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void closeNoBlock() {
|
||||
delegate.closeNoBlock();
|
||||
}
|
||||
|
||||
private synchronized void addConnection(Transport.Connection addedConnection) {
|
||||
|
|
|
@ -117,7 +117,7 @@ public abstract class RemoteConnectionStrategy implements TransportConnectionLis
|
|||
this.clusterAlias = clusterAlias;
|
||||
this.transportService = transportService;
|
||||
this.connectionManager = connectionManager;
|
||||
connectionManager.getConnectionManager().addListener(this);
|
||||
connectionManager.addListener(this);
|
||||
}
|
||||
|
||||
static ConnectionProfile buildConnectionProfile(String clusterAlias, Settings settings) {
|
||||
|
@ -271,7 +271,7 @@ public abstract class RemoteConnectionStrategy implements TransportConnectionLis
|
|||
.getConcreteSettingForNamespace(clusterAlias)
|
||||
.get(newSettings);
|
||||
|
||||
ConnectionProfile oldProfile = connectionManager.getConnectionManager().getConnectionProfile();
|
||||
ConnectionProfile oldProfile = connectionManager.getConnectionProfile();
|
||||
ConnectionProfile.Builder builder = new ConnectionProfile.Builder(oldProfile);
|
||||
builder.setCompressionEnabled(compressionEnabled);
|
||||
builder.setPingInterval(pingSchedule);
|
||||
|
@ -299,7 +299,7 @@ public abstract class RemoteConnectionStrategy implements TransportConnectionLis
|
|||
final List<ActionListener<Void>> toNotify;
|
||||
synchronized (mutex) {
|
||||
if (closed.compareAndSet(false, true)) {
|
||||
connectionManager.getConnectionManager().removeListener(this);
|
||||
connectionManager.removeListener(this);
|
||||
toNotify = listeners;
|
||||
listeners = Collections.emptyList();
|
||||
} else {
|
||||
|
|
|
@ -316,7 +316,7 @@ public class SniffConnectionStrategy extends RemoteConnectionStrategy {
|
|||
|
||||
final StepListener<TransportService.HandshakeResponse> handshakeStep = new StepListener<>();
|
||||
openConnectionStep.whenComplete(connection -> {
|
||||
ConnectionProfile connectionProfile = connectionManager.getConnectionManager().getConnectionProfile();
|
||||
ConnectionProfile connectionProfile = connectionManager.getConnectionProfile();
|
||||
transportService.handshake(connection, connectionProfile.getHandshakeTimeout().millis(),
|
||||
getRemoteClusterNamePredicate(), handshakeStep);
|
||||
}, onFailure);
|
||||
|
|
|
@ -152,7 +152,7 @@ public class TransportService extends AbstractLifecycleComponent implements Tran
|
|||
Function<BoundTransportAddress, DiscoveryNode> localNodeFactory, @Nullable ClusterSettings clusterSettings,
|
||||
Set<String> taskHeaders) {
|
||||
this(settings, transport, threadPool, transportInterceptor, localNodeFactory, clusterSettings, taskHeaders,
|
||||
new ConnectionManager(settings, transport));
|
||||
new ClusterConnectionManager(settings, transport));
|
||||
}
|
||||
|
||||
public TransportService(Settings settings, Transport transport, ThreadPool threadPool, TransportInterceptor transportInterceptor,
|
||||
|
|
|
@ -36,6 +36,7 @@ import org.elasticsearch.test.transport.CapturingTransport;
|
|||
import org.elasticsearch.test.transport.CapturingTransport.CapturedRequest;
|
||||
import org.elasticsearch.test.transport.StubbableConnectionManager;
|
||||
import org.elasticsearch.threadpool.ThreadPool.Names;
|
||||
import org.elasticsearch.transport.ClusterConnectionManager;
|
||||
import org.elasticsearch.transport.ConnectionManager;
|
||||
import org.elasticsearch.transport.TransportException;
|
||||
import org.elasticsearch.transport.TransportResponseHandler;
|
||||
|
@ -211,9 +212,9 @@ public class PeerFinderTests extends ESTestCase {
|
|||
localNode = newDiscoveryNode("local-node");
|
||||
|
||||
ConnectionManager innerConnectionManager
|
||||
= new ConnectionManager(settings, capturingTransport);
|
||||
= new ClusterConnectionManager(settings, capturingTransport);
|
||||
StubbableConnectionManager connectionManager
|
||||
= new StubbableConnectionManager(innerConnectionManager, settings, capturingTransport);
|
||||
= new StubbableConnectionManager(innerConnectionManager);
|
||||
connectionManager.setDefaultNodeConnectedBehavior((cm, discoveryNode) -> {
|
||||
final boolean isConnected = connectedNodes.contains(discoveryNode);
|
||||
final boolean isDisconnected = disconnectedNodes.contains(discoveryNode);
|
||||
|
|
|
@ -49,9 +49,9 @@ import static org.mockito.Matchers.eq;
|
|||
import static org.mockito.Mockito.doAnswer;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
||||
public class ConnectionManagerTests extends ESTestCase {
|
||||
public class ClusterConnectionManagerTests extends ESTestCase {
|
||||
|
||||
private ConnectionManager connectionManager;
|
||||
private ClusterConnectionManager connectionManager;
|
||||
private ThreadPool threadPool;
|
||||
private Transport transport;
|
||||
private ConnectionProfile connectionProfile;
|
||||
|
@ -59,11 +59,11 @@ public class ConnectionManagerTests extends ESTestCase {
|
|||
@Before
|
||||
public void createConnectionManager() {
|
||||
Settings settings = Settings.builder()
|
||||
.put("node.name", ConnectionManagerTests.class.getSimpleName())
|
||||
.put("node.name", ClusterConnectionManagerTests.class.getSimpleName())
|
||||
.build();
|
||||
threadPool = new ThreadPool(settings);
|
||||
transport = mock(Transport.class);
|
||||
connectionManager = new ConnectionManager(settings, transport);
|
||||
connectionManager = new ClusterConnectionManager(settings, transport);
|
||||
TimeValue oneSecond = new TimeValue(1000);
|
||||
TimeValue oneMinute = TimeValue.timeValueMinutes(1);
|
||||
connectionProfile = ConnectionProfile.buildSingleChannelProfile(TransportRequestOptions.Type.REG, oneSecond, oneSecond,
|
||||
|
@ -321,6 +321,11 @@ public class ConnectionManagerTests extends ESTestCase {
|
|||
return node;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Version getVersion() {
|
||||
return node.getVersion();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options)
|
||||
throws TransportException {
|
|
@ -90,7 +90,7 @@ public class ProxyConnectionStrategyTests extends ESTestCase {
|
|||
localService.start();
|
||||
localService.acceptIncomingRequests();
|
||||
|
||||
ConnectionManager connectionManager = new ConnectionManager(profile, localService.transport);
|
||||
ClusterConnectionManager connectionManager = new ClusterConnectionManager(profile, localService.transport);
|
||||
int numOfConnections = randomIntBetween(4, 8);
|
||||
try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
|
||||
ProxyConnectionStrategy strategy = new ProxyConnectionStrategy(clusterAlias, localService, remoteConnectionManager,
|
||||
|
@ -119,7 +119,7 @@ public class ProxyConnectionStrategyTests extends ESTestCase {
|
|||
localService.start();
|
||||
localService.acceptIncomingRequests();
|
||||
|
||||
ConnectionManager connectionManager = new ConnectionManager(profile, localService.transport);
|
||||
ClusterConnectionManager connectionManager = new ClusterConnectionManager(profile, localService.transport);
|
||||
int numOfConnections = randomIntBetween(4, 8);
|
||||
|
||||
AtomicBoolean useAddress1 = new AtomicBoolean(true);
|
||||
|
@ -169,7 +169,7 @@ public class ProxyConnectionStrategyTests extends ESTestCase {
|
|||
localService.start();
|
||||
localService.acceptIncomingRequests();
|
||||
|
||||
ConnectionManager connectionManager = new ConnectionManager(profile, localService.transport);
|
||||
ClusterConnectionManager connectionManager = new ClusterConnectionManager(profile, localService.transport);
|
||||
int numOfConnections = randomIntBetween(4, 8);
|
||||
try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
|
||||
ProxyConnectionStrategy strategy = new ProxyConnectionStrategy(clusterAlias, localService, remoteConnectionManager,
|
||||
|
@ -199,7 +199,7 @@ public class ProxyConnectionStrategyTests extends ESTestCase {
|
|||
localService.start();
|
||||
localService.acceptIncomingRequests();
|
||||
|
||||
ConnectionManager connectionManager = new ConnectionManager(profile, localService.transport);
|
||||
ClusterConnectionManager connectionManager = new ClusterConnectionManager(profile, localService.transport);
|
||||
int numOfConnections = randomIntBetween(4, 8);
|
||||
|
||||
AtomicBoolean useAddress1 = new AtomicBoolean(true);
|
||||
|
@ -251,7 +251,7 @@ public class ProxyConnectionStrategyTests extends ESTestCase {
|
|||
localService.start();
|
||||
localService.acceptIncomingRequests();
|
||||
|
||||
ConnectionManager connectionManager = new ConnectionManager(profile, localService.transport);
|
||||
ClusterConnectionManager connectionManager = new ClusterConnectionManager(profile, localService.transport);
|
||||
int numOfConnections = randomIntBetween(4, 8);
|
||||
try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
|
||||
ProxyConnectionStrategy strategy = new ProxyConnectionStrategy(clusterAlias, localService, remoteConnectionManager,
|
||||
|
@ -276,7 +276,7 @@ public class ProxyConnectionStrategyTests extends ESTestCase {
|
|||
localService.start();
|
||||
localService.acceptIncomingRequests();
|
||||
|
||||
ConnectionManager connectionManager = new ConnectionManager(profile, localService.transport);
|
||||
ClusterConnectionManager connectionManager = new ClusterConnectionManager(profile, localService.transport);
|
||||
int numOfConnections = randomIntBetween(4, 8);
|
||||
try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
|
||||
ProxyConnectionStrategy strategy = new ProxyConnectionStrategy(clusterAlias, localService, remoteConnectionManager,
|
||||
|
@ -359,7 +359,7 @@ public class ProxyConnectionStrategyTests extends ESTestCase {
|
|||
|
||||
String address = "localhost:" + address1.getPort();
|
||||
|
||||
ConnectionManager connectionManager = new ConnectionManager(profile, localService.transport);
|
||||
ClusterConnectionManager connectionManager = new ClusterConnectionManager(profile, localService.transport);
|
||||
int numOfConnections = randomIntBetween(4, 8);
|
||||
try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
|
||||
ProxyConnectionStrategy strategy = new ProxyConnectionStrategy(clusterAlias, localService, remoteConnectionManager,
|
||||
|
|
|
@ -547,7 +547,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
|
|||
DiscoveryNode node = randomFrom(discoverableNodes);
|
||||
try {
|
||||
connection.getConnectionManager().getConnection(node);
|
||||
} catch (NodeNotConnectedException e) {
|
||||
} catch (NoSuchRemoteClusterException e) {
|
||||
// Ignore
|
||||
}
|
||||
}
|
||||
|
|
|
@ -46,7 +46,7 @@ public class RemoteConnectionManagerTests extends ESTestCase {
|
|||
public void setUp() throws Exception {
|
||||
super.setUp();
|
||||
transport = mock(Transport.class);
|
||||
remoteConnectionManager = new RemoteConnectionManager("remote-cluster", new ConnectionManager(Settings.EMPTY, transport));
|
||||
remoteConnectionManager = new RemoteConnectionManager("remote-cluster", new ClusterConnectionManager(Settings.EMPTY, transport));
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
|
@ -72,25 +72,25 @@ public class RemoteConnectionManagerTests extends ESTestCase {
|
|||
remoteConnectionManager.connectToNode(node2, null, validator, future2);
|
||||
assertTrue(future2.isDone());
|
||||
|
||||
assertEquals(node1, remoteConnectionManager.getRemoteConnection(node1).getNode());
|
||||
assertEquals(node2, remoteConnectionManager.getRemoteConnection(node2).getNode());
|
||||
assertEquals(node1, remoteConnectionManager.getConnection(node1).getNode());
|
||||
assertEquals(node2, remoteConnectionManager.getConnection(node2).getNode());
|
||||
|
||||
DiscoveryNode node4 = new DiscoveryNode("node-4", address, Version.CURRENT);
|
||||
assertThat(remoteConnectionManager.getRemoteConnection(node4), instanceOf(RemoteConnectionManager.ProxyConnection.class));
|
||||
assertThat(remoteConnectionManager.getConnection(node4), instanceOf(RemoteConnectionManager.ProxyConnection.class));
|
||||
|
||||
// Test round robin
|
||||
Set<Version> versions = new HashSet<>();
|
||||
versions.add(remoteConnectionManager.getRemoteConnection(node4).getVersion());
|
||||
versions.add(remoteConnectionManager.getRemoteConnection(node4).getVersion());
|
||||
versions.add(remoteConnectionManager.getConnection(node4).getVersion());
|
||||
versions.add(remoteConnectionManager.getConnection(node4).getVersion());
|
||||
|
||||
assertThat(versions, hasItems(Version.CURRENT, Version.CURRENT.minimumCompatibilityVersion()));
|
||||
|
||||
// Test that the connection is cleared from the round robin list when it is closed
|
||||
remoteConnectionManager.getRemoteConnection(node1).close();
|
||||
remoteConnectionManager.getConnection(node1).close();
|
||||
|
||||
versions.clear();
|
||||
versions.add(remoteConnectionManager.getRemoteConnection(node4).getVersion());
|
||||
versions.add(remoteConnectionManager.getRemoteConnection(node4).getVersion());
|
||||
versions.add(remoteConnectionManager.getConnection(node4).getVersion());
|
||||
versions.add(remoteConnectionManager.getConnection(node4).getVersion());
|
||||
|
||||
assertThat(versions, hasItems(Version.CURRENT.minimumCompatibilityVersion()));
|
||||
assertEquals(1, versions.size());
|
||||
|
@ -109,6 +109,11 @@ public class RemoteConnectionManagerTests extends ESTestCase {
|
|||
return node;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Version getVersion() {
|
||||
return node.getVersion();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options)
|
||||
throws TransportException {
|
||||
|
|
|
@ -30,7 +30,7 @@ import static org.mockito.Mockito.mock;
|
|||
public class RemoteConnectionStrategyTests extends ESTestCase {
|
||||
|
||||
public void testStrategyChangeMeansThatStrategyMustBeRebuilt() {
|
||||
ConnectionManager connectionManager = new ConnectionManager(Settings.EMPTY, mock(Transport.class));
|
||||
ClusterConnectionManager connectionManager = new ClusterConnectionManager(Settings.EMPTY, mock(Transport.class));
|
||||
RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager("cluster-alias", connectionManager);
|
||||
FakeConnectionStrategy first = new FakeConnectionStrategy("cluster-alias", mock(TransportService.class), remoteConnectionManager,
|
||||
RemoteConnectionStrategy.ConnectionStrategy.PROXY);
|
||||
|
@ -41,7 +41,7 @@ public class RemoteConnectionStrategyTests extends ESTestCase {
|
|||
}
|
||||
|
||||
public void testSameStrategyChangeMeansThatStrategyDoesNotNeedToBeRebuilt() {
|
||||
ConnectionManager connectionManager = new ConnectionManager(Settings.EMPTY, mock(Transport.class));
|
||||
ClusterConnectionManager connectionManager = new ClusterConnectionManager(Settings.EMPTY, mock(Transport.class));
|
||||
RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager("cluster-alias", connectionManager);
|
||||
FakeConnectionStrategy first = new FakeConnectionStrategy("cluster-alias", mock(TransportService.class), remoteConnectionManager,
|
||||
RemoteConnectionStrategy.ConnectionStrategy.PROXY);
|
||||
|
@ -52,7 +52,7 @@ public class RemoteConnectionStrategyTests extends ESTestCase {
|
|||
}
|
||||
|
||||
public void testChangeInConnectionProfileMeansTheStrategyMustBeRebuilt() {
|
||||
ConnectionManager connectionManager = new ConnectionManager(TestProfiles.LIGHT_PROFILE, mock(Transport.class));
|
||||
ClusterConnectionManager connectionManager = new ClusterConnectionManager(TestProfiles.LIGHT_PROFILE, mock(Transport.class));
|
||||
assertEquals(TimeValue.MINUS_ONE, connectionManager.getConnectionProfile().getPingInterval());
|
||||
assertEquals(false, connectionManager.getConnectionProfile().getCompressionEnabled());
|
||||
RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager("cluster-alias", connectionManager);
|
||||
|
|
|
@ -125,7 +125,7 @@ public class SniffConnectionStrategyTests extends ESTestCase {
|
|||
localService.start();
|
||||
localService.acceptIncomingRequests();
|
||||
|
||||
ConnectionManager connectionManager = new ConnectionManager(profile, localService.transport);
|
||||
ClusterConnectionManager connectionManager = new ClusterConnectionManager(profile, localService.transport);
|
||||
try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
|
||||
SniffConnectionStrategy strategy = new SniffConnectionStrategy(clusterAlias, localService, remoteConnectionManager,
|
||||
null, 3, n -> true, seedNodes(seedNode))) {
|
||||
|
@ -162,7 +162,7 @@ public class SniffConnectionStrategyTests extends ESTestCase {
|
|||
localService.start();
|
||||
localService.acceptIncomingRequests();
|
||||
|
||||
ConnectionManager connectionManager = new ConnectionManager(profile, localService.transport);
|
||||
ClusterConnectionManager connectionManager = new ClusterConnectionManager(profile, localService.transport);
|
||||
try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
|
||||
SniffConnectionStrategy strategy = new SniffConnectionStrategy(clusterAlias, localService, remoteConnectionManager,
|
||||
null, 3, n -> true, seedNodes(seedNode), Collections.singletonList(seedNodeSupplier))) {
|
||||
|
@ -198,7 +198,7 @@ public class SniffConnectionStrategyTests extends ESTestCase {
|
|||
localService.start();
|
||||
localService.acceptIncomingRequests();
|
||||
|
||||
ConnectionManager connectionManager = new ConnectionManager(profile, localService.transport);
|
||||
ClusterConnectionManager connectionManager = new ClusterConnectionManager(profile, localService.transport);
|
||||
try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
|
||||
SniffConnectionStrategy strategy = new SniffConnectionStrategy(clusterAlias, localService, remoteConnectionManager,
|
||||
null, 2, n -> true, seedNodes(seedNode))) {
|
||||
|
@ -244,7 +244,7 @@ public class SniffConnectionStrategyTests extends ESTestCase {
|
|||
localService.start();
|
||||
localService.acceptIncomingRequests();
|
||||
|
||||
ConnectionManager connectionManager = new ConnectionManager(profile, localService.transport);
|
||||
ClusterConnectionManager connectionManager = new ClusterConnectionManager(profile, localService.transport);
|
||||
try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
|
||||
SniffConnectionStrategy strategy = new SniffConnectionStrategy(clusterAlias, localService, remoteConnectionManager,
|
||||
null, 3, n -> true, seedNodes(seedNode))) {
|
||||
|
@ -273,7 +273,7 @@ public class SniffConnectionStrategyTests extends ESTestCase {
|
|||
localService.start();
|
||||
localService.acceptIncomingRequests();
|
||||
|
||||
ConnectionManager connectionManager = new ConnectionManager(profile, localService.transport);
|
||||
ClusterConnectionManager connectionManager = new ClusterConnectionManager(profile, localService.transport);
|
||||
try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
|
||||
SniffConnectionStrategy strategy = new SniffConnectionStrategy(clusterAlias, localService, remoteConnectionManager,
|
||||
null, 3, n -> true, seedNodes(incompatibleSeedNode))) {
|
||||
|
@ -303,7 +303,7 @@ public class SniffConnectionStrategyTests extends ESTestCase {
|
|||
localService.start();
|
||||
localService.acceptIncomingRequests();
|
||||
|
||||
ConnectionManager connectionManager = new ConnectionManager(profile, localService.transport);
|
||||
ClusterConnectionManager connectionManager = new ClusterConnectionManager(profile, localService.transport);
|
||||
try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
|
||||
SniffConnectionStrategy strategy = new SniffConnectionStrategy(clusterAlias, localService, remoteConnectionManager,
|
||||
null, 3, n -> n.equals(rejectedNode) == false, seedNodes(seedNode))) {
|
||||
|
@ -347,7 +347,7 @@ public class SniffConnectionStrategyTests extends ESTestCase {
|
|||
localService.start();
|
||||
localService.acceptIncomingRequests();
|
||||
|
||||
ConnectionManager connectionManager = new ConnectionManager(profile, localService.transport);
|
||||
ClusterConnectionManager connectionManager = new ClusterConnectionManager(profile, localService.transport);
|
||||
try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
|
||||
SniffConnectionStrategy strategy = new SniffConnectionStrategy(clusterAlias, localService, remoteConnectionManager,
|
||||
null, 3, n -> true, seedNodes(seedNode, otherSeedNode))) {
|
||||
|
@ -395,7 +395,7 @@ public class SniffConnectionStrategyTests extends ESTestCase {
|
|||
localService.start();
|
||||
localService.acceptIncomingRequests();
|
||||
|
||||
ConnectionManager connectionManager = new ConnectionManager(profile, localService.transport);
|
||||
ClusterConnectionManager connectionManager = new ClusterConnectionManager(profile, localService.transport);
|
||||
try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
|
||||
SniffConnectionStrategy strategy = new SniffConnectionStrategy(clusterAlias, localService, remoteConnectionManager,
|
||||
null, 3, n -> true, seedNodes(seedNode))) {
|
||||
|
@ -465,7 +465,7 @@ public class SniffConnectionStrategyTests extends ESTestCase {
|
|||
|
||||
List<String> seedNodes = Collections.singletonList(accessibleNode.toString());
|
||||
TransportAddress proxyAddress = accessibleNode.getAddress();
|
||||
ConnectionManager connectionManager = new ConnectionManager(profile, transport);
|
||||
ClusterConnectionManager connectionManager = new ClusterConnectionManager(profile, transport);
|
||||
try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
|
||||
SniffConnectionStrategy strategy = new SniffConnectionStrategy(clusterAlias, localService, remoteConnectionManager,
|
||||
proxyAddress.toString(), 3, n -> true, seedNodes)) {
|
||||
|
@ -502,7 +502,7 @@ public class SniffConnectionStrategyTests extends ESTestCase {
|
|||
localService.start();
|
||||
localService.acceptIncomingRequests();
|
||||
|
||||
ConnectionManager connectionManager = new ConnectionManager(profile, localService.transport);
|
||||
ClusterConnectionManager connectionManager = new ClusterConnectionManager(profile, localService.transport);
|
||||
try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
|
||||
SniffConnectionStrategy strategy = new SniffConnectionStrategy(clusterAlias, localService, remoteConnectionManager,
|
||||
null, 3, n -> true, seedNodes(seedNode))) {
|
||||
|
|
|
@ -38,7 +38,7 @@ import org.elasticsearch.common.transport.BoundTransportAddress;
|
|||
import org.elasticsearch.common.transport.TransportAddress;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.CloseableConnection;
|
||||
import org.elasticsearch.transport.ConnectionManager;
|
||||
import org.elasticsearch.transport.ClusterConnectionManager;
|
||||
import org.elasticsearch.transport.ConnectionProfile;
|
||||
import org.elasticsearch.transport.RemoteTransportException;
|
||||
import org.elasticsearch.transport.RequestHandlerRegistry;
|
||||
|
@ -79,8 +79,7 @@ public class MockTransport implements Transport, LifecycleComponent {
|
|||
public TransportService createTransportService(Settings settings, ThreadPool threadPool, TransportInterceptor interceptor,
|
||||
Function<BoundTransportAddress, DiscoveryNode> localNodeFactory,
|
||||
@Nullable ClusterSettings clusterSettings, Set<String> taskHeaders) {
|
||||
StubbableConnectionManager connectionManager = new StubbableConnectionManager(new ConnectionManager(settings, this),
|
||||
settings, this);
|
||||
StubbableConnectionManager connectionManager = new StubbableConnectionManager(new ClusterConnectionManager(settings, this));
|
||||
connectionManager.setDefaultNodeConnectedBehavior((cm, node) -> false);
|
||||
connectionManager.setDefaultGetConnectionBehavior((cm, discoveryNode) -> createConnection(discoveryNode));
|
||||
return new TransportService(settings, this, threadPool, interceptor, localNodeFactory, clusterSettings, taskHeaders,
|
||||
|
|
|
@ -48,7 +48,7 @@ import org.elasticsearch.test.ESTestCase;
|
|||
import org.elasticsearch.test.tasks.MockTaskManager;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.ConnectTransportException;
|
||||
import org.elasticsearch.transport.ConnectionManager;
|
||||
import org.elasticsearch.transport.ClusterConnectionManager;
|
||||
import org.elasticsearch.transport.ConnectionProfile;
|
||||
import org.elasticsearch.transport.RequestHandlerRegistry;
|
||||
import org.elasticsearch.transport.Transport;
|
||||
|
@ -160,7 +160,7 @@ public final class MockTransportService extends TransportService {
|
|||
Function<BoundTransportAddress, DiscoveryNode> localNodeFactory,
|
||||
@Nullable ClusterSettings clusterSettings, Set<String> taskHeaders) {
|
||||
super(settings, transport, threadPool, interceptor, localNodeFactory, clusterSettings, taskHeaders,
|
||||
new StubbableConnectionManager(new ConnectionManager(settings, transport), settings, transport));
|
||||
new StubbableConnectionManager(new ClusterConnectionManager(settings, transport)));
|
||||
this.original = transport.getDelegate();
|
||||
}
|
||||
|
||||
|
|
|
@ -20,26 +20,24 @@ package org.elasticsearch.test.transport;
|
|||
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.transport.TransportAddress;
|
||||
import org.elasticsearch.transport.ConnectTransportException;
|
||||
import org.elasticsearch.transport.ConnectionManager;
|
||||
import org.elasticsearch.transport.ConnectionProfile;
|
||||
import org.elasticsearch.transport.ConnectionManager;
|
||||
import org.elasticsearch.transport.Transport;
|
||||
import org.elasticsearch.transport.TransportConnectionListener;
|
||||
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
||||
public class StubbableConnectionManager extends ConnectionManager {
|
||||
public class StubbableConnectionManager implements ConnectionManager {
|
||||
|
||||
private final ConnectionManager delegate;
|
||||
private final ConcurrentMap<TransportAddress, GetConnectionBehavior> getConnectionBehaviors;
|
||||
private volatile GetConnectionBehavior defaultGetConnectionBehavior = ConnectionManager::getConnection;
|
||||
private volatile NodeConnectedBehavior defaultNodeConnectedBehavior = ConnectionManager::nodeConnected;
|
||||
|
||||
public StubbableConnectionManager(ConnectionManager delegate, Settings settings, Transport transport) {
|
||||
super(settings, transport);
|
||||
public StubbableConnectionManager(ConnectionManager delegate) {
|
||||
this.delegate = delegate;
|
||||
this.getConnectionBehaviors = new ConcurrentHashMap<>();
|
||||
}
|
||||
|
@ -118,6 +116,16 @@ public class StubbableConnectionManager extends ConnectionManager {
|
|||
delegate.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void closeNoBlock() {
|
||||
delegate.closeNoBlock();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ConnectionProfile getConnectionProfile() {
|
||||
return delegate.getConnectionProfile();
|
||||
}
|
||||
|
||||
@FunctionalInterface
|
||||
public interface GetConnectionBehavior {
|
||||
Transport.Connection getConnection(ConnectionManager connectionManager, DiscoveryNode discoveryNode);
|
||||
|
|
Loading…
Reference in New Issue