Fix open/close race in ConnectionManagerTests (#50621)

Currently we reuse the same test connection for all connection attempts
in the testConcurrentConnectsAndDisconnects test. This means that if the
connection fails due to a pre-existing connection, the connection will
be closed impacting the state of all connection attempts. This commit
fixes the test, by returning a unique connection for each attempt.

Fixes #49903.
This commit is contained in:
Tim Brooks 2020-01-08 14:33:11 -06:00
parent 1577a0e617
commit 27c2eb744e
No known key found for this signature in database
GPG Key ID: C2AA3BB91A889E77
1 changed files with 41 additions and 11 deletions

View File

@ -33,8 +33,11 @@ import org.junit.Before;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
@ -124,16 +127,22 @@ public class ConnectionManagerTests extends ESTestCase {
assertEquals(1, nodeDisconnectedCount.get());
}
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/49903")
public void testConcurrentConnectsAndDisconnects() throws BrokenBarrierException, InterruptedException {
public void testConcurrentConnects() throws Exception {
Set<Transport.Connection> connections = Collections.newSetFromMap(new ConcurrentHashMap<>());
DiscoveryNode node = new DiscoveryNode("", new TransportAddress(InetAddress.getLoopbackAddress(), 0), Version.CURRENT);
Transport.Connection connection = new TestConnect(node);
doAnswer(invocationOnMock -> {
ActionListener<Transport.Connection> listener = (ActionListener<Transport.Connection>) invocationOnMock.getArguments()[2];
if (rarely()) {
boolean success = randomBoolean();
if (success) {
Transport.Connection connection = new TestConnect(node);
connections.add(connection);
if (randomBoolean()) {
listener.onResponse(connection);
} else if (frequently()) {
} else {
threadPool.generic().execute(() -> listener.onResponse(connection));
}
} else {
threadPool.generic().execute(() -> listener.onFailure(new IllegalStateException("dummy exception")));
}
@ -143,10 +152,13 @@ public class ConnectionManagerTests extends ESTestCase {
assertFalse(connectionManager.nodeConnected(node));
ConnectionManager.ConnectionValidator validator = (c, p, l) -> {
if (rarely()) {
boolean success = randomBoolean();
if (success) {
if (randomBoolean()) {
l.onResponse(null);
} else if (frequently()) {
} else {
threadPool.generic().execute(() -> l.onResponse(null));
}
} else {
threadPool.generic().execute(() -> l.onFailure(new IllegalStateException("dummy exception")));
}
@ -198,6 +210,24 @@ public class ConnectionManagerTests extends ESTestCase {
});
assertEquals(10, nodeConnectedCount.get() + nodeFailureCount.get());
int managedConnections = connectionManager.size();
if (managedConnections != 0) {
assertEquals(1, managedConnections);
// Only a single connection attempt should be open.
assertEquals(1, connections.stream().filter(c -> c.isClosed() == false).count());
} else {
// No connections succeeded
assertEquals(0, connections.stream().filter(c -> c.isClosed() == false).count());
}
connectionManager.close();
// The connection manager will close all open connections
for (Transport.Connection connection : connections) {
assertTrue(connection.isClosed());
}
}
public void testConnectFailsDuringValidation() {