ARTEMIS-4252 Fix flaky QuorumFailOverTest tests

This commit is contained in:
Domenico Francesco Bruscino 2023-04-22 20:24:37 +02:00 committed by clebertsuconic
parent fde9d223ae
commit ef4501292e
4 changed files with 22 additions and 0 deletions

View File

@ -89,6 +89,8 @@ public interface ClusterConnection extends ActiveMQComponent, ClusterTopologyLis
long getCallTimeout(); long getCallTimeout();
Bridge[] getBridges();
/** /**
* The metric for this cluster connection * The metric for this cluster connection
* *

View File

@ -630,6 +630,11 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
return callTimeout; return callTimeout;
} }
@Override
public Bridge[] getBridges() {
return records.values().stream().map(MessageFlowRecord::getBridge).toArray(Bridge[]::new);
}
@Override @Override
public Map<String, String> getNodes() { public Map<String, String> getNodes() {
synchronized (recordsGuard) { synchronized (recordsGuard) {

View File

@ -41,6 +41,7 @@ import java.lang.ref.WeakReference;
import java.sql.DriverManager; import java.sql.DriverManager;
import java.sql.SQLException; import java.sql.SQLException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
@ -120,6 +121,7 @@ import org.apache.activemq.artemis.core.server.JournalType;
import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.NodeManager; import org.apache.activemq.artemis.core.server.NodeManager;
import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.cluster.Bridge;
import org.apache.activemq.artemis.core.server.cluster.ClusterConnection; import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
import org.apache.activemq.artemis.core.server.cluster.ClusterManager; import org.apache.activemq.artemis.core.server.cluster.ClusterManager;
import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding; import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding;
@ -1328,6 +1330,17 @@ public abstract class ActiveMQTestBase extends Assert {
} }
} }
protected static final void waitForBridges(final ActiveMQServer server, int connectedBridges) throws Exception {
waitForBridges(server.getClusterManager().getDefaultConnection(null), connectedBridges);
}
protected static final void waitForBridges(ClusterConnection clusterConnection, int connectedBridges) throws Exception {
Bridge[] bridges = clusterConnection.getBridges();
Wait.assertTrue(() -> Arrays.stream(clusterConnection.getBridges())
.filter(bridge -> bridge.isConnected()).count() >= connectedBridges);
}
protected static final Map<String, Object> generateParams(final int node, final boolean netty) { protected static final Map<String, Object> generateParams(final int node, final boolean netty) {
Map<String, Object> params = new HashMap<>(); Map<String, Object> params = new HashMap<>();

View File

@ -66,6 +66,7 @@ public class QuorumFailOverTest extends StaticClusterWithBackupFailoverTest {
startServers(3, 4, 5); startServers(3, 4, 5);
for (int i : liveServerIDs) { for (int i : liveServerIDs) {
waitForBridges(servers[i], 2);
waitForTopology(servers[i], 3, 3); waitForTopology(servers[i], 3, 3);
} }
@ -122,6 +123,7 @@ public class QuorumFailOverTest extends StaticClusterWithBackupFailoverTest {
} }
for (int i : liveServerIDs) { for (int i : liveServerIDs) {
waitForBridges(servers[i], 2);
waitForTopology(servers[i], 3, 3); waitForTopology(servers[i], 3, 3);
} }