Stop opening PING conns to remote clusters (#61408)

Today a remote cluster connection comprises a `PING` and a `REG`
channel. The `PING` channel is only used for health checks between the
elected master and the members of its own cluster, so is unused in a
remote cluster connection. This commit removes this unused connection.
This commit is contained in:
David Turner 2020-08-21 12:21:36 +01:00
parent cb83e7011c
commit 078e8717ee
2 changed files with 34 additions and 4 deletions

View File

@ -138,9 +138,8 @@ public abstract class RemoteConnectionStrategy implements TransportConnectionLis
.setCompressionEnabled(RemoteClusterService.REMOTE_CLUSTER_COMPRESS.getConcreteSettingForNamespace(clusterAlias).get(settings))
.setPingInterval(RemoteClusterService.REMOTE_CLUSTER_PING_SCHEDULE.getConcreteSettingForNamespace(clusterAlias).get(settings))
.addConnections(0, TransportRequestOptions.Type.BULK, TransportRequestOptions.Type.STATE,
TransportRequestOptions.Type.RECOVERY)
// TODO: Evaluate if we actually need PING channels?
.addConnections(mode.numberOfChannels, TransportRequestOptions.Type.REG, TransportRequestOptions.Type.PING);
TransportRequestOptions.Type.RECOVERY, TransportRequestOptions.Type.PING)
.addConnections(mode.numberOfChannels, TransportRequestOptions.Type.REG);
return builder.build();
}
@ -211,7 +210,7 @@ public abstract class RemoteConnectionStrategy implements TransportConnectionLis
static int parsePort(String remoteHost) {
try {
int port = Integer.valueOf(remoteHost.substring(indexOfPortSeparator(remoteHost) + 1));
int port = Integer.parseInt(remoteHost.substring(indexOfPortSeparator(remoteHost) + 1));
if (port <= 0) {
throw new IllegalArgumentException("port number must be > 0 but was: [" + port + "]");
}

View File

@ -82,6 +82,7 @@ import java.util.stream.Collectors;
import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
@ -492,6 +493,36 @@ public class RemoteClusterConnectionTests extends ESTestCase {
}
}
public void testNoChannelsExceptREG() throws Exception {
List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT)) {
DiscoveryNode seedNode = seedTransport.getLocalDiscoNode();
knownNodes.add(seedTransport.getLocalDiscoNode());
try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) {
service.start();
service.acceptIncomingRequests();
String clusterAlias = "test-cluster";
Settings settings = buildRandomSettings(clusterAlias, addresses(seedNode));
try (RemoteClusterConnection connection = new RemoteClusterConnection(settings, clusterAlias, service)) {
PlainActionFuture<Void> plainActionFuture = new PlainActionFuture<>();
connection.ensureConnected(plainActionFuture);
plainActionFuture.get(10, TimeUnit.SECONDS);
for (TransportRequestOptions.Type type : TransportRequestOptions.Type.values()) {
if (type != TransportRequestOptions.Type.REG) {
assertThat(expectThrows(IllegalStateException.class,
() -> connection.getConnection().sendRequest(randomNonNegativeLong(),
"arbitrary", TransportRequest.Empty.INSTANCE,
TransportRequestOptions.builder().withType(type).build())).getMessage(),
allOf(containsString("can't select"), containsString(type.toString())));
}
}
}
}
}
}
public void testConnectedNodesConcurrentAccess() throws IOException, InterruptedException {
List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
List<MockTransportService> discoverableTransports = new CopyOnWriteArrayList<>();