mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-05 20:48:22 +00:00
first review round
This commit is contained in:
parent
22438855d3
commit
7f6c89f9a8
@ -83,7 +83,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
|
||||
private final ConnectionProfile remoteProfile;
|
||||
private final CopyOnWriteArrayList<DiscoveryNode> connectedNodes = new CopyOnWriteArrayList();
|
||||
private final Supplier<DiscoveryNode> nodeSupplier;
|
||||
private final String clusterName;
|
||||
private final String clusterAlias;
|
||||
private final int maxNumRemoteConnections;
|
||||
private final Predicate<DiscoveryNode> nodePredicate;
|
||||
private volatile List<DiscoveryNode> seedNodes;
|
||||
@ -92,19 +92,19 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
|
||||
/**
|
||||
* Creates a new {@link RemoteClusterConnection}
|
||||
* @param settings the nodes settings object
|
||||
* @param clusterName the configured name of the cluster to connect to
|
||||
* @param clusterAlias the configured alias of the cluster to connect to
|
||||
* @param seedNodes a list of seed nodes to discover eligible nodes from
|
||||
* @param transportService the local nodes transport service
|
||||
* @param maxNumRemoteConnections the maximum number of connections to the remote cluster
|
||||
* @param nodePredicate a predicate to filter eligible remote nodes to connect to
|
||||
*/
|
||||
RemoteClusterConnection(Settings settings, String clusterName, List<DiscoveryNode> seedNodes,
|
||||
TransportService transportService, int maxNumRemoteConnections, Predicate<DiscoveryNode> nodePredicate) {
|
||||
RemoteClusterConnection(Settings settings, String clusterAlias, List<DiscoveryNode> seedNodes,
|
||||
TransportService transportService, int maxNumRemoteConnections, Predicate<DiscoveryNode> nodePredicate) {
|
||||
super(settings);
|
||||
this.transportService = transportService;
|
||||
this.maxNumRemoteConnections = maxNumRemoteConnections;
|
||||
this.nodePredicate = nodePredicate;
|
||||
this.clusterName = clusterName;
|
||||
this.clusterAlias = clusterAlias;
|
||||
ConnectionProfile.Builder builder = new ConnectionProfile.Builder();
|
||||
builder.setConnectTimeout(TcpTransport.TCP_CONNECT_TIMEOUT.get(settings));
|
||||
builder.setHandshakeTimeout(TcpTransport.TCP_CONNECT_TIMEOUT.get(settings));
|
||||
@ -121,7 +121,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
|
||||
if (current == null || current.hasNext() == false) {
|
||||
current = connectedNodes.iterator();
|
||||
if (current.hasNext() == false) {
|
||||
throw new IllegalStateException("No node available for cluster: " + clusterName + " nodes: " + connectedNodes);
|
||||
throw new IllegalStateException("No node available for cluster: " + clusterAlias + " nodes: " + connectedNodes);
|
||||
}
|
||||
}
|
||||
return current.next();
|
||||
@ -225,10 +225,10 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the name of the remote cluster
|
||||
* Returns the alias / name of the remote cluster
|
||||
*/
|
||||
public String getClusterName() {
|
||||
return clusterName;
|
||||
public String getClusterAlias() {
|
||||
return clusterAlias;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -370,7 +370,8 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
|
||||
request.nodes(true);
|
||||
transportService.sendRequest(connection,
|
||||
ClusterStateAction.NAME, request, TransportRequestOptions.EMPTY,
|
||||
new StateResponseHandler(transportService, connection, listener, seedNodes, cancellableThreads));
|
||||
new SniffClusterStateResponseHandler(transportService, connection, listener, seedNodes,
|
||||
cancellableThreads));
|
||||
success = true;
|
||||
} finally {
|
||||
if (success == false) {
|
||||
@ -387,7 +388,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
|
||||
// ISE if we fail the handshake with an version incompatible node
|
||||
if (seedNodes.hasNext()) {
|
||||
logger.debug((Supplier<?>) () -> new ParameterizedMessage("fetching nodes from external cluster {} failed",
|
||||
clusterName), ex);
|
||||
clusterAlias), ex);
|
||||
collectRemoteNodes(seedNodes, transportService, listener);
|
||||
} else {
|
||||
listener.onFailure(ex);
|
||||
@ -408,7 +409,8 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
|
||||
}
|
||||
}
|
||||
|
||||
private class StateResponseHandler implements TransportResponseHandler<ClusterStateResponse> {
|
||||
/* This class handles the _state response from the remote cluster when sniffing nodes to connect to */
|
||||
private class SniffClusterStateResponseHandler implements TransportResponseHandler<ClusterStateResponse> {
|
||||
|
||||
private final TransportService transportService;
|
||||
private final Transport.Connection connection;
|
||||
@ -416,8 +418,9 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
|
||||
private final Iterator<DiscoveryNode> seedNodes;
|
||||
private final CancellableThreads cancellableThreads;
|
||||
|
||||
StateResponseHandler(TransportService transportService, Transport.Connection connection, ActionListener<Void> listener,
|
||||
Iterator<DiscoveryNode> seedNodes, CancellableThreads cancellableThreads) {
|
||||
SniffClusterStateResponseHandler(TransportService transportService, Transport.Connection connection,
|
||||
ActionListener<Void> listener, Iterator<DiscoveryNode> seedNodes,
|
||||
CancellableThreads cancellableThreads) {
|
||||
this.transportService = transportService;
|
||||
this.connection = connection;
|
||||
this.listener = listener;
|
||||
@ -457,7 +460,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
|
||||
} catch (Exception ex) {
|
||||
logger.warn((Supplier<?>)
|
||||
() -> new ParameterizedMessage("fetching nodes from external cluster {} failed",
|
||||
clusterName), ex);
|
||||
clusterAlias), ex);
|
||||
collectRemoteNodes(seedNodes, transportService, listener);
|
||||
} finally {
|
||||
// just to make sure we don't leak anything we close the connection here again even if we managed to do so before
|
||||
@ -468,7 +471,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
|
||||
@Override
|
||||
public void handleException(TransportException exp) {
|
||||
logger.warn((Supplier<?>)
|
||||
() -> new ParameterizedMessage("fetching nodes from external cluster {} failed", clusterName),
|
||||
() -> new ParameterizedMessage("fetching nodes from external cluster {} failed", clusterAlias),
|
||||
exp);
|
||||
try {
|
||||
IOUtils.closeWhileHandlingException(connection);
|
||||
|
@ -134,16 +134,16 @@ public final class RemoteClusterService extends AbstractComponent implements Clo
|
||||
remoteClusters.put(entry.getKey(), remote);
|
||||
}
|
||||
remote.updateSeedNodes(entry.getValue(), ActionListener.wrap(
|
||||
x -> {
|
||||
response -> {
|
||||
if (countDown.countDown()) {
|
||||
connectionListener.onResponse(x);
|
||||
connectionListener.onResponse(response);
|
||||
}
|
||||
},
|
||||
e -> {
|
||||
exception -> {
|
||||
if (countDown.fastForward()) {
|
||||
connectionListener.onFailure(e);
|
||||
connectionListener.onFailure(exception);
|
||||
}
|
||||
logger.error("failed to update seed list for cluster: " + entry.getKey(), e);
|
||||
logger.error("failed to update seed list for cluster: " + entry.getKey(), exception);
|
||||
}));
|
||||
}
|
||||
}
|
||||
@ -314,7 +314,6 @@ public final class RemoteClusterService extends AbstractComponent implements Clo
|
||||
DiscoveryNode node = new DiscoveryNode(clusterName + "#" + remoteHost,
|
||||
new TransportAddress(new InetSocketAddress(hostAddress, port)),
|
||||
Version.CURRENT.minimumCompatibilityVersion());
|
||||
//don't connect yet as that would require the remote node to be up and would fail the local node startup otherwise
|
||||
List<DiscoveryNode> nodes = remoteClustersNodes.get(clusterName);
|
||||
if (nodes == null) {
|
||||
nodes = new ArrayList<>();
|
||||
|
@ -204,7 +204,7 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
|
||||
connection = remoteConnections.apply(nodeId);
|
||||
}
|
||||
if (connection == null) {
|
||||
throw new IllegalArgumentException("no node found for id: " + nodeId);
|
||||
throw new IllegalStateException("no node found for id: " + nodeId);
|
||||
}
|
||||
return connection;
|
||||
};
|
||||
|
Loading…
x
Reference in New Issue
Block a user