diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java index b00eb02867..1d6cf4f876 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java @@ -1375,11 +1375,7 @@ public abstract class ClusterTestBase extends ActiveMQTestBase { serverTotc = new TransportConfiguration(INVM_CONNECTOR_FACTORY, params); } - if (ha) { - locators[node] = ActiveMQClient.createServerLocatorWithHA(serverTotc); - } else { - locators[node] = ActiveMQClient.createServerLocatorWithoutHA(serverTotc); - } + setSessionFactoryCreateLocator(node, ha, serverTotc); locators[node].setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(locators[node])); @@ -1392,6 +1388,14 @@ public abstract class ClusterTestBase extends ActiveMQTestBase { sfs[node] = sf; } + protected void setSessionFactoryCreateLocator(int node, boolean ha, TransportConfiguration serverTotc) { + if (ha) { + locators[node] = ActiveMQClient.createServerLocatorWithHA(serverTotc); + } else { + locators[node] = ActiveMQClient.createServerLocatorWithoutHA(serverTotc); + } + } + protected void setupSessionFactory(final int node, final boolean netty, int reconnectAttempts) throws Exception { if (sfs[node] != null) { throw new IllegalArgumentException("Already a server at " + node); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageRedistributionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageRedistributionTest.java index 69c03cc446..2020489a15 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageRedistributionTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageRedistributionTest.java @@ -24,12 +24,14 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.api.core.client.ClientConsumer; import org.apache.activemq.artemis.api.core.client.ClientMessage; import org.apache.activemq.artemis.api.core.client.ClientProducer; import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.core.server.Bindable; +import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; import org.apache.activemq.artemis.core.server.cluster.impl.Redistributor; import org.apache.activemq.artemis.core.server.group.impl.GroupingHandlerConfiguration; @@ -37,6 +39,7 @@ import org.apache.activemq.artemis.core.server.impl.QueueImpl; import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger; +import org.apache.activemq.artemis.tests.util.Wait; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -63,6 +66,17 @@ public class MessageRedistributionTest extends ClusterTestBase { return false; } + + + @Override + protected void setSessionFactoryCreateLocator(int node, boolean ha, TransportConfiguration serverTotc) { + super.setSessionFactoryCreateLocator(node, ha, serverTotc); + + locators[node].setConsumerWindowSize(0); + + } + + //https://issues.jboss.org/browse/HORNETQ-1061 @Test public void testRedistributionWithMessageGroups() throws Exception { @@ -989,7 +1003,17 @@ public class MessageRedistributionTest extends ClusterTestBase { removeConsumer(0); addConsumer(1, 1, "queue0", null); - verifyReceiveAll(QueueImpl.REDISTRIBUTOR_BATCH_SIZE * 2, 1); + Queue queue = servers[1].locateQueue(SimpleString.toSimpleString("queue0")); + Assert.assertNotNull(queue); + Wait.waitFor(() -> queue.getMessageCount() == QueueImpl.REDISTRIBUTOR_BATCH_SIZE * 2); + + for (int i = 0; i < QueueImpl.REDISTRIBUTOR_BATCH_SIZE * 2; i++) { + ClientMessage message = consumers[1].getConsumer().receive(5000); + Assert.assertNotNull(message); + message.acknowledge(); + } + + Assert.assertNull(consumers[1].getConsumer().receiveImmediate()); } /*