mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-23 05:15:04 +00:00
Reduce channels in AbstractSimpleTransportTestCase (#34863)
This is related to #30876. The AbstractSimpleTransportTestCase initiates many tcp connections. There are normally over 1,000 connections in TIME_WAIT at the end of the test. This is because every test opens at least two different transports that connect to each other with 13 channel connection profiles. This commit modifies the default connection profile used by this test to 6. One connection for each type, except for REG which gets 2 connections.
This commit is contained in:
parent
312df5546c
commit
cf9aff954e
@ -73,7 +73,7 @@ public class SimpleNetty4TransportTests extends AbstractSimpleTransportTestCase
|
||||
}
|
||||
};
|
||||
MockTransportService mockTransportService =
|
||||
MockTransportService.createNewService(Settings.EMPTY, transport, version, threadPool, clusterSettings, Collections.emptySet());
|
||||
MockTransportService.createNewService(settings, transport, version, threadPool, clusterSettings, Collections.emptySet());
|
||||
mockTransportService.start();
|
||||
return mockTransportService;
|
||||
}
|
||||
|
@ -77,7 +77,7 @@ public class SimpleNioTransportTests extends AbstractSimpleTransportTestCase {
|
||||
}
|
||||
};
|
||||
MockTransportService mockTransportService =
|
||||
MockTransportService.createNewService(Settings.EMPTY, transport, version, threadPool, clusterSettings, Collections.emptySet());
|
||||
MockTransportService.createNewService(settings, transport, version, threadPool, clusterSettings, Collections.emptySet());
|
||||
mockTransportService.start();
|
||||
return mockTransportService;
|
||||
}
|
||||
|
@ -67,7 +67,7 @@ public class ConnectionManager implements Closeable {
|
||||
private final DelegatingNodeConnectionListener connectionListener = new DelegatingNodeConnectionListener();
|
||||
|
||||
public ConnectionManager(Settings settings, Transport transport, ThreadPool threadPool) {
|
||||
this(settings, transport, threadPool, buildDefaultConnectionProfile(settings));
|
||||
this(settings, transport, threadPool, ConnectionProfile.buildDefaultConnectionProfile(settings));
|
||||
}
|
||||
|
||||
public ConnectionManager(Settings settings, Transport transport, ThreadPool threadPool, ConnectionProfile defaultProfile) {
|
||||
@ -323,23 +323,4 @@ public class ConnectionManager implements Closeable {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static ConnectionProfile buildDefaultConnectionProfile(Settings settings) {
|
||||
int connectionsPerNodeRecovery = TransportService.CONNECTIONS_PER_NODE_RECOVERY.get(settings);
|
||||
int connectionsPerNodeBulk = TransportService.CONNECTIONS_PER_NODE_BULK.get(settings);
|
||||
int connectionsPerNodeReg = TransportService.CONNECTIONS_PER_NODE_REG.get(settings);
|
||||
int connectionsPerNodeState = TransportService.CONNECTIONS_PER_NODE_STATE.get(settings);
|
||||
int connectionsPerNodePing = TransportService.CONNECTIONS_PER_NODE_PING.get(settings);
|
||||
ConnectionProfile.Builder builder = new ConnectionProfile.Builder();
|
||||
builder.setConnectTimeout(TransportService.TCP_CONNECT_TIMEOUT.get(settings));
|
||||
builder.setHandshakeTimeout(TransportService.TCP_CONNECT_TIMEOUT.get(settings));
|
||||
builder.addConnections(connectionsPerNodeBulk, TransportRequestOptions.Type.BULK);
|
||||
builder.addConnections(connectionsPerNodePing, TransportRequestOptions.Type.PING);
|
||||
// if we are not master eligible we don't need a dedicated channel to publish the state
|
||||
builder.addConnections(DiscoveryNode.isMasterNode(settings) ? connectionsPerNodeState : 0, TransportRequestOptions.Type.STATE);
|
||||
// if we are not a data-node we don't need any dedicated channels for recovery
|
||||
builder.addConnections(DiscoveryNode.isDataNode(settings) ? connectionsPerNodeRecovery : 0, TransportRequestOptions.Type.RECOVERY);
|
||||
builder.addConnections(connectionsPerNodeReg, TransportRequestOptions.Type.REG);
|
||||
return builder.build();
|
||||
}
|
||||
}
|
||||
|
@ -18,7 +18,9 @@
|
||||
*/
|
||||
package org.elasticsearch.transport;
|
||||
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
|
||||
import java.util.ArrayList;
|
||||
@ -91,6 +93,31 @@ public final class ConnectionProfile {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Builds a default connection profile based on the provided settings.
|
||||
*
|
||||
* @param settings to build the connection profile from
|
||||
* @return the connection profile
|
||||
*/
|
||||
public static ConnectionProfile buildDefaultConnectionProfile(Settings settings) {
|
||||
int connectionsPerNodeRecovery = TransportService.CONNECTIONS_PER_NODE_RECOVERY.get(settings);
|
||||
int connectionsPerNodeBulk = TransportService.CONNECTIONS_PER_NODE_BULK.get(settings);
|
||||
int connectionsPerNodeReg = TransportService.CONNECTIONS_PER_NODE_REG.get(settings);
|
||||
int connectionsPerNodeState = TransportService.CONNECTIONS_PER_NODE_STATE.get(settings);
|
||||
int connectionsPerNodePing = TransportService.CONNECTIONS_PER_NODE_PING.get(settings);
|
||||
Builder builder = new Builder();
|
||||
builder.setConnectTimeout(TransportService.TCP_CONNECT_TIMEOUT.get(settings));
|
||||
builder.setHandshakeTimeout(TransportService.TCP_CONNECT_TIMEOUT.get(settings));
|
||||
builder.addConnections(connectionsPerNodeBulk, TransportRequestOptions.Type.BULK);
|
||||
builder.addConnections(connectionsPerNodePing, TransportRequestOptions.Type.PING);
|
||||
// if we are not master eligible we don't need a dedicated channel to publish the state
|
||||
builder.addConnections(DiscoveryNode.isMasterNode(settings) ? connectionsPerNodeState : 0, TransportRequestOptions.Type.STATE);
|
||||
// if we are not a data-node we don't need any dedicated channels for recovery
|
||||
builder.addConnections(DiscoveryNode.isDataNode(settings) ? connectionsPerNodeRecovery : 0, TransportRequestOptions.Type.RECOVERY);
|
||||
builder.addConnections(connectionsPerNodeReg, TransportRequestOptions.Type.REG);
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* A builder to build a new {@link ConnectionProfile}
|
||||
*/
|
||||
|
@ -64,7 +64,7 @@ public class ConnectionManagerTests extends ESTestCase {
|
||||
}
|
||||
|
||||
public void testConnectionProfileResolve() {
|
||||
final ConnectionProfile defaultProfile = ConnectionManager.buildDefaultConnectionProfile(Settings.EMPTY);
|
||||
final ConnectionProfile defaultProfile = ConnectionProfile.buildDefaultConnectionProfile(Settings.EMPTY);
|
||||
assertEquals(defaultProfile, ConnectionProfile.resolveConnectionProfile(null, defaultProfile));
|
||||
|
||||
final ConnectionProfile.Builder builder = new ConnectionProfile.Builder();
|
||||
@ -96,7 +96,7 @@ public class ConnectionManagerTests extends ESTestCase {
|
||||
}
|
||||
|
||||
public void testDefaultConnectionProfile() {
|
||||
ConnectionProfile profile = ConnectionManager.buildDefaultConnectionProfile(Settings.EMPTY);
|
||||
ConnectionProfile profile = ConnectionProfile.buildDefaultConnectionProfile(Settings.EMPTY);
|
||||
assertEquals(13, profile.getNumConnections());
|
||||
assertEquals(1, profile.getNumConnectionsPerType(TransportRequestOptions.Type.PING));
|
||||
assertEquals(6, profile.getNumConnectionsPerType(TransportRequestOptions.Type.REG));
|
||||
@ -104,7 +104,7 @@ public class ConnectionManagerTests extends ESTestCase {
|
||||
assertEquals(2, profile.getNumConnectionsPerType(TransportRequestOptions.Type.RECOVERY));
|
||||
assertEquals(3, profile.getNumConnectionsPerType(TransportRequestOptions.Type.BULK));
|
||||
|
||||
profile = ConnectionManager.buildDefaultConnectionProfile(Settings.builder().put("node.master", false).build());
|
||||
profile = ConnectionProfile.buildDefaultConnectionProfile(Settings.builder().put("node.master", false).build());
|
||||
assertEquals(12, profile.getNumConnections());
|
||||
assertEquals(1, profile.getNumConnectionsPerType(TransportRequestOptions.Type.PING));
|
||||
assertEquals(6, profile.getNumConnectionsPerType(TransportRequestOptions.Type.REG));
|
||||
@ -112,7 +112,7 @@ public class ConnectionManagerTests extends ESTestCase {
|
||||
assertEquals(2, profile.getNumConnectionsPerType(TransportRequestOptions.Type.RECOVERY));
|
||||
assertEquals(3, profile.getNumConnectionsPerType(TransportRequestOptions.Type.BULK));
|
||||
|
||||
profile = ConnectionManager.buildDefaultConnectionProfile(Settings.builder().put("node.data", false).build());
|
||||
profile = ConnectionProfile.buildDefaultConnectionProfile(Settings.builder().put("node.data", false).build());
|
||||
assertEquals(11, profile.getNumConnections());
|
||||
assertEquals(1, profile.getNumConnectionsPerType(TransportRequestOptions.Type.PING));
|
||||
assertEquals(6, profile.getNumConnectionsPerType(TransportRequestOptions.Type.REG));
|
||||
@ -120,7 +120,7 @@ public class ConnectionManagerTests extends ESTestCase {
|
||||
assertEquals(0, profile.getNumConnectionsPerType(TransportRequestOptions.Type.RECOVERY));
|
||||
assertEquals(3, profile.getNumConnectionsPerType(TransportRequestOptions.Type.BULK));
|
||||
|
||||
profile = ConnectionManager.buildDefaultConnectionProfile(Settings.builder().put("node.data", false)
|
||||
profile = ConnectionProfile.buildDefaultConnectionProfile(Settings.builder().put("node.data", false)
|
||||
.put("node.master", false).build());
|
||||
assertEquals(10, profile.getNumConnections());
|
||||
assertEquals(1, profile.getNumConnectionsPerType(TransportRequestOptions.Type.PING));
|
||||
|
@ -116,7 +116,8 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
||||
protected abstract MockTransportService build(Settings settings, Version version, ClusterSettings clusterSettings, boolean doHandshake);
|
||||
|
||||
protected int channelsPerNodeConnection() {
|
||||
return 13;
|
||||
// This is a customized profile for this test case.
|
||||
return 6;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -125,9 +126,17 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
||||
super.setUp();
|
||||
threadPool = new TestThreadPool(getClass().getName());
|
||||
clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
|
||||
serviceA = buildService("TS_A", version0, clusterSettings); // this one supports dynamic tracer updates
|
||||
Settings connectionSettings = Settings.builder()
|
||||
.put(TransportService.CONNECTIONS_PER_NODE_RECOVERY.getKey(), 1)
|
||||
.put(TransportService.CONNECTIONS_PER_NODE_BULK.getKey(), 1)
|
||||
.put(TransportService.CONNECTIONS_PER_NODE_REG.getKey(), 2)
|
||||
.put(TransportService.CONNECTIONS_PER_NODE_STATE.getKey(), 1)
|
||||
.put(TransportService.CONNECTIONS_PER_NODE_PING.getKey(), 1)
|
||||
.build();
|
||||
|
||||
serviceA = buildService("TS_A", version0, clusterSettings, connectionSettings); // this one supports dynamic tracer updates
|
||||
nodeA = serviceA.getLocalNode();
|
||||
serviceB = buildService("TS_B", version1, null); // this one doesn't support dynamic tracer updates
|
||||
serviceB = buildService("TS_B", version1, null, connectionSettings); // this one doesn't support dynamic tracer updates
|
||||
nodeB = serviceB.getLocalNode();
|
||||
// wait till all nodes are properly connected and the event has been sent, so tests in this class
|
||||
// will not get this callback called on the connections done in this setup
|
||||
@ -174,7 +183,12 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
||||
}
|
||||
|
||||
protected MockTransportService buildService(final String name, final Version version, ClusterSettings clusterSettings) {
|
||||
return buildService(name, version, clusterSettings, Settings.EMPTY, true, true);
|
||||
return buildService(name, version, clusterSettings, Settings.EMPTY);
|
||||
}
|
||||
|
||||
protected MockTransportService buildService(final String name, final Version version, ClusterSettings clusterSettings,
|
||||
Settings settings) {
|
||||
return buildService(name, version, clusterSettings, settings, true, true);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -1999,7 +2013,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
||||
assertEquals("handshake failed", exception.getCause().getMessage());
|
||||
}
|
||||
|
||||
ConnectionProfile connectionProfile = ConnectionManager.buildDefaultConnectionProfile(Settings.EMPTY);
|
||||
ConnectionProfile connectionProfile = ConnectionProfile.buildDefaultConnectionProfile(Settings.EMPTY);
|
||||
try (TransportService service = buildService("TS_TPC", Version.CURRENT, null);
|
||||
TcpTransport.NodeChannels connection = originalTransport.openConnection(
|
||||
new DiscoveryNode("TS_TPC", "TS_TPC", service.boundAddress().publishAddress(), emptyMap(), emptySet(), version0),
|
||||
|
@ -50,7 +50,7 @@ public class MockTcpTransportTests extends AbstractSimpleTransportTestCase {
|
||||
}
|
||||
};
|
||||
MockTransportService mockTransportService =
|
||||
MockTransportService.createNewService(Settings.EMPTY, transport, version, threadPool, clusterSettings, Collections.emptySet());
|
||||
MockTransportService.createNewService(settings, transport, version, threadPool, clusterSettings, Collections.emptySet());
|
||||
mockTransportService.start();
|
||||
return mockTransportService;
|
||||
}
|
||||
|
@ -78,7 +78,7 @@ public class SimpleMockNioTransportTests extends AbstractSimpleTransportTestCase
|
||||
|
||||
};
|
||||
MockTransportService mockTransportService =
|
||||
MockTransportService.createNewService(Settings.EMPTY, transport, version, threadPool, clusterSettings, Collections.emptySet());
|
||||
MockTransportService.createNewService(settings, transport, version, threadPool, clusterSettings, Collections.emptySet());
|
||||
mockTransportService.start();
|
||||
return mockTransportService;
|
||||
}
|
||||
|
@ -21,7 +21,6 @@ import org.elasticsearch.test.transport.MockTransportService;
|
||||
import org.elasticsearch.transport.AbstractSimpleTransportTestCase;
|
||||
import org.elasticsearch.transport.BindTransportException;
|
||||
import org.elasticsearch.transport.ConnectTransportException;
|
||||
import org.elasticsearch.transport.ConnectionManager;
|
||||
import org.elasticsearch.transport.ConnectionProfile;
|
||||
import org.elasticsearch.transport.TcpTransport;
|
||||
import org.elasticsearch.transport.TransportRequestOptions;
|
||||
@ -111,7 +110,7 @@ public abstract class AbstractSimpleSecurityTransportTestCase extends AbstractSi
|
||||
assumeTrue("only tcp transport has a handshake method", serviceA.getOriginalTransport() instanceof TcpTransport);
|
||||
TcpTransport originalTransport = (TcpTransport) serviceA.getOriginalTransport();
|
||||
|
||||
ConnectionProfile connectionProfile = ConnectionManager.buildDefaultConnectionProfile(Settings.EMPTY);
|
||||
ConnectionProfile connectionProfile = ConnectionProfile.buildDefaultConnectionProfile(Settings.EMPTY);
|
||||
try (TransportService service = buildService("TS_TPC", Version.CURRENT, null);
|
||||
TcpTransport.NodeChannels connection = originalTransport.openConnection(
|
||||
new DiscoveryNode("TS_TPC", "TS_TPC", service.boundAddress().publishAddress(), emptyMap(), emptySet(), version0),
|
||||
|
@ -93,7 +93,7 @@ public class SimpleSecurityNetty4ServerTransportTests extends AbstractSimpleSecu
|
||||
|
||||
};
|
||||
MockTransportService mockTransportService =
|
||||
MockTransportService.createNewService(Settings.EMPTY, transport, version, threadPool, clusterSettings,
|
||||
MockTransportService.createNewService(settings, transport, version, threadPool, clusterSettings,
|
||||
Collections.emptySet());
|
||||
mockTransportService.start();
|
||||
return mockTransportService;
|
||||
|
@ -55,7 +55,7 @@ public class SimpleSecurityNioTransportTests extends AbstractSimpleSecurityTrans
|
||||
|
||||
};
|
||||
MockTransportService mockTransportService =
|
||||
MockTransportService.createNewService(Settings.EMPTY, transport, version, threadPool, clusterSettings,
|
||||
MockTransportService.createNewService(settings, transport, version, threadPool, clusterSettings,
|
||||
Collections.emptySet());
|
||||
mockTransportService.start();
|
||||
return mockTransportService;
|
||||
|
Loading…
x
Reference in New Issue
Block a user