diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederatedQueueTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederatedQueueTest.java index 7dcd81e621..5af8de146e 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederatedQueueTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederatedQueueTest.java @@ -26,13 +26,14 @@ import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; -import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.config.FederationConfiguration; import org.apache.activemq.artemis.core.config.federation.FederationQueuePolicyConfiguration; import org.apache.activemq.artemis.core.config.federation.FederationUpstreamConfiguration; import org.apache.activemq.artemis.core.postoffice.QueueBinding; +import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.transformer.Transformer; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; import org.apache.activemq.artemis.tests.util.Wait; import org.junit.Before; @@ -51,6 +52,12 @@ public class FederatedQueueTest extends FederatedTestBase { } + @Override + protected void configureQueues(ActiveMQServer server) throws Exception { + server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setAutoCreateAddresses(false).setAutoCreateQueues(false)); + createSimpleQueue(server, getName()); + } + protected ConnectionFactory getCF(int i) throws Exception { return new ActiveMQConnectionFactory("vm://" + i); } @@ -110,10 +117,8 @@ public class FederatedQueueTest extends FederatedTestBase { MessageConsumer consumer0 = session0.createConsumer(queue0); MessageConsumer consumer1 = session1.createConsumer(queue1); - Wait.assertTrue(() -> getServer(1).getPostOffice().getBinding(SimpleString.toSimpleString(queueName)) != null); - //Wait for local and federated consumer to be established on Server 1 - assertTrue(Wait.waitFor(() -> getServer(1).locateQueue(SimpleString.toSimpleString(queueName)).getConsumerCount() == 2, - 5000, 100)); + + Wait.waitFor(() -> getConsumerCount(getServer(1), queueName, 2)); MessageProducer producer1 = session1.createProducer(queue1); producer1.send(session1.createTextMessage("hello")); @@ -265,10 +270,6 @@ public class FederatedQueueTest extends FederatedTestBase { @Test public void testFederatedQueueBiDirectionalUpstream() throws Exception { String queueName = getName(); - //Set queue up on both brokers - for (int i = 0; i < 2; i++) { - getServer(i).createQueue(SimpleString.toSimpleString(queueName), RoutingType.ANYCAST, SimpleString.toSimpleString(queueName), null, true, false); - } FederationConfiguration federationConfiguration0 = FederatedTestUtil.createQueueUpstreamFederationConfiguration("server1", queueName); getServer(0).getConfiguration().getFederationConfigurations().add(federationConfiguration0); getServer(0).getFederationManager().deploy(); @@ -283,10 +284,6 @@ public class FederatedQueueTest extends FederatedTestBase { @Test public void testFederatedQueueBiDirectionalDownstream() throws Exception { String queueName = getName(); - //Set queue up on both brokers - for (int i = 0; i < 2; i++) { - getServer(i).createQueue(SimpleString.toSimpleString(queueName), RoutingType.ANYCAST, SimpleString.toSimpleString(queueName), null, true, false); - } FederationConfiguration federationConfiguration0 = FederatedTestUtil.createQueueDownstreamFederationConfiguration("server1", queueName, "server0"); getServer(0).getConfiguration().getFederationConfigurations().add(federationConfiguration0); getServer(0).getFederationManager().deploy(); @@ -301,10 +298,6 @@ public class FederatedQueueTest extends FederatedTestBase { @Test public void testFederatedQueueBiDirectionalDownstreamUpstream() throws Exception { String queueName = getName(); - //Set queue up on both brokers - for (int i = 0; i < 2; i++) { - getServer(i).createQueue(SimpleString.toSimpleString(queueName), RoutingType.ANYCAST, SimpleString.toSimpleString(queueName), null, true, false); - } FederationConfiguration federationConfiguration0 = FederatedTestUtil.createQueueDownstreamFederationConfiguration("server1-downstream", "server1", queueName, null, false, "server0"); @@ -319,10 +312,6 @@ public class FederatedQueueTest extends FederatedTestBase { @Test public void testFederatedQueueBiDirectionalDownstreamUpstreamSharedConnection() throws Exception { String queueName = getName(); - //Set queue up on both brokers - for (int i = 0; i < 2; i++) { - getServer(i).createQueue(SimpleString.toSimpleString(queueName), RoutingType.ANYCAST, SimpleString.toSimpleString(queueName), null, true, false); - } FederationConfiguration federationConfiguration0 = FederatedTestUtil.createQueueDownstreamFederationConfiguration("server1-downstream", "server1", queueName, null, true, "server0"); @@ -338,10 +327,6 @@ public class FederatedQueueTest extends FederatedTestBase { @Test public void testFederatedQueueShareUpstreamConnectionFalse() throws Exception { String queueName = getName(); - //Set queue up on both brokers - for (int i = 0; i < 2; i++) { - getServer(i).createQueue(SimpleString.toSimpleString(queueName), RoutingType.ANYCAST, SimpleString.toSimpleString(queueName), null, true, false); - } FederationConfiguration federationConfiguration0 = FederatedTestUtil.createQueueDownstreamFederationConfiguration("server1-downstream", "server1", queueName, null, false, "server0"); @@ -355,10 +340,6 @@ public class FederatedQueueTest extends FederatedTestBase { @Test public void testFederatedQueueShareUpstreamConnectionTrue() throws Exception { String queueName = getName(); - //Set queue up on both brokers - for (int i = 0; i < 2; i++) { - getServer(i).createQueue(SimpleString.toSimpleString(queueName), RoutingType.ANYCAST, SimpleString.toSimpleString(queueName), null, true, false); - } FederationConfiguration federationConfiguration0 = FederatedTestUtil.createQueueDownstreamFederationConfiguration("server1-downstream", "server1", queueName, null, true, "server0"); @@ -459,11 +440,6 @@ public class FederatedQueueTest extends FederatedTestBase { public void testFederatedQueueChainOfBrokers() throws Exception { String queueName = getName(); - //Set queue up on all three brokers - for (int i = 0; i < 3; i++) { - getServer(i).createQueue(SimpleString.toSimpleString(queueName), RoutingType.ANYCAST, SimpleString.toSimpleString(queueName), null, true, false); - } - //Connect broker 0 (consumer will be here at end of chain) to broker 1 FederationConfiguration federationConfiguration0 = FederatedTestUtil.createQueueUpstreamFederationConfiguration("server1", queueName, true); getServer(0).getConfiguration().getFederationConfigurations().add(federationConfiguration0); @@ -502,11 +478,6 @@ public class FederatedQueueTest extends FederatedTestBase { public void testFederatedQueueRemoteBrokerRestart() throws Exception { String queueName = getName(); - //Set queue up on both brokers - for (int i = 0; i < 2; i++) { - getServer(i).createQueue(SimpleString.toSimpleString(queueName), RoutingType.ANYCAST, SimpleString.toSimpleString(queueName), null, true, false); - } - FederationConfiguration federationConfiguration = FederatedTestUtil.createQueueUpstreamFederationConfiguration("server1", queueName); getServer(0).getConfiguration().getFederationConfigurations().add(federationConfiguration); getServer(0).getFederationManager().deploy(); @@ -538,6 +509,8 @@ public class FederatedQueueTest extends FederatedTestBase { assertNull(consumer0.receiveNoWait()); getServer(1).start(); + Wait.assertTrue(getServer(1)::isActive); + createSimpleQueue(getServer(1), getName()); connection1 = cf1.createConnection(); connection1.start(); @@ -546,22 +519,27 @@ public class FederatedQueueTest extends FederatedTestBase { producer = session1.createProducer(queue1); producer.send(session1.createTextMessage("hello")); - Wait.assertTrue(() -> getServer(1).getPostOffice().getBinding(SimpleString.toSimpleString(queueName)) != null); - - Wait.waitFor(() -> ((QueueBinding) getServer(1).getPostOffice().getBinding(SimpleString.toSimpleString(queueName))).consumerCount() == 1); + Wait.waitFor(() -> getConsumerCount(getServer(1), queueName, 1)); assertNotNull(consumer0.receive(1000)); } + private boolean getConsumerCount(ActiveMQServer server, String queueName, int count) { + QueueBinding binding = (QueueBinding)server.getPostOffice().getBinding(SimpleString.toSimpleString(queueName)); + if (binding == null) { + return false; + } + if (binding.consumerCount() != count) { + return false; + } + + return true; + } + @Test public void testFederatedQueueLocalBrokerRestart() throws Exception { String queueName = getName(); - //Set queue up on both brokers - for (int i = 0; i < 2; i++) { - getServer(i).createQueue(SimpleString.toSimpleString(queueName), RoutingType.ANYCAST, SimpleString.toSimpleString(queueName), null, true, false); - } - FederationConfiguration federationConfiguration = FederatedTestUtil.createQueueUpstreamFederationConfiguration("server1", queueName); getServer(0).getConfiguration().getFederationConfigurations().add(federationConfiguration); getServer(0).getFederationManager().deploy(); @@ -594,6 +572,7 @@ public class FederatedQueueTest extends FederatedTestBase { getServer(0).start(); Wait.waitFor(() -> getServer(0).isActive()); + createSimpleQueue(getServer(0), getName()); connection0 = getCF(0).createConnection(); connection0.start(); @@ -608,7 +587,7 @@ public class FederatedQueueTest extends FederatedTestBase { .getBinding(SimpleString.toSimpleString(queueName))) .consumerCount() == 1); - assertNotNull(consumer0.receive(1000)); + assertNotNull(consumer0.receive(5000)); } private Message createTextMessage(Session session1, String group) throws JMSException { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederatedTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederatedTestBase.java index 00f711d86a..1cb6f19baa 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederatedTestBase.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederatedTestBase.java @@ -20,9 +20,13 @@ import java.util.ArrayList; import java.util.List; import javax.management.MBeanServer; import javax.management.MBeanServerFactory; + +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServers; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.junit.Before; @@ -47,11 +51,27 @@ public class FederatedTestBase extends ActiveMQTestBase { config.addConnectorConfiguration("server" + j, "vm://" + j); } ActiveMQServer server = addServer(ActiveMQServers.newActiveMQServer(config, mBeanServer, false)); + servers.add(server); server.start(); + + configureQueues(server); } } + protected void configureQueues(ActiveMQServer server) throws Exception { + } + + protected void createSimpleQueue(ActiveMQServer server, String queueName) throws Exception { + SimpleString simpleStringQueueName = SimpleString.toSimpleString(queueName); + try { + server.addAddressInfo(new AddressInfo(simpleStringQueueName, RoutingType.ANYCAST)); + server.createQueue(simpleStringQueueName, RoutingType.ANYCAST, simpleStringQueueName, null, true, false); + } catch (Exception ignored) { + } + + } + protected int numberOfServers() { return 3; }