diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/cluster/MessageRedistributionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/cluster/MessageRedistributionTest.java index 3539c03199..dc0aeb42f2 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/cluster/MessageRedistributionTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/cluster/MessageRedistributionTest.java @@ -17,6 +17,7 @@ package org.apache.activemq.artemis.tests.integration.openwire.cluster; import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.postoffice.Binding; import org.apache.activemq.artemis.core.postoffice.Bindings; @@ -24,6 +25,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding; import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase; +import org.apache.activemq.artemis.tests.util.Wait; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.util.ConsumerThread; import org.junit.Test; @@ -54,8 +56,10 @@ public class MessageRedistributionTest extends ClusterTestBase { setupSessionFactory(0, true); setupSessionFactory(1, true); - createQueue(0, "queues.testaddress", "queue0", null, false); - createQueue(1, "queues.testaddress", "queue0", null, false); + createAddressInfo(0, "queues.testAddress", RoutingType.ANYCAST, -1, false); + createAddressInfo(1, "queues.testAddress", RoutingType.ANYCAST, -1, false); + createQueue(0, "queues.testaddress", "queue0", null, true); + createQueue(1, "queues.testaddress", "queue0", null, true); //alternately create consumers to the 2 nodes //close the connection then close consumer quickly @@ -94,11 +98,27 @@ public class MessageRedistributionTest extends ClusterTestBase { conn.close(); } + Wait.waitFor(() -> getRemoteQueueBinding(servers[remoteNode]) != null); + //check remote server's consumer count - ActiveMQServer remoteServer = servers[remoteNode]; + RemoteQueueBinding remoteBinding = getRemoteQueueBinding(servers[remoteNode]); + + assertNotNull(remoteBinding); + + Wait.waitFor(() -> remoteBinding.consumerCount() >= 0); + int count = remoteBinding.consumerCount(); + assertTrue("consumer count should never be negative " + count, count >= 0); + } + + private RemoteQueueBinding getRemoteQueueBinding(ActiveMQServer server) throws Exception { + ActiveMQServer remoteServer = server; Bindings bindings = remoteServer.getPostOffice().getBindingsForAddress(new SimpleString("queues.testaddress")); Collection bindingSet = bindings.getBindings(); + return getRemoteQueueBinding(bindingSet); + } + + private RemoteQueueBinding getRemoteQueueBinding(Collection bindingSet) { RemoteQueueBinding remoteBinding = null; for (Binding b : bindingSet) { if (b instanceof RemoteQueueBinding) { @@ -106,9 +126,6 @@ public class MessageRedistributionTest extends ClusterTestBase { break; } } - - assertNotNull(remoteBinding); - int count = remoteBinding.consumerCount(); - assertTrue("consumer count should never be negative " + count, count >= 0); + return remoteBinding; } }