From 2d02a265279156cc2885fd93baf5751de8b2c655 Mon Sep 17 00:00:00 2001 From: Martyn Taylor Date: Mon, 24 Oct 2016 14:27:00 +0100 Subject: [PATCH] ARTEMIS-780 Added ANYCAST routing to local queues --- .../core/postoffice/AddressManager.java | 2 + .../artemis/core/postoffice/PostOffice.java | 2 + .../core/postoffice/impl/BindingsImpl.java | 1 + .../postoffice/impl/LocalQueueBinding.java | 9 +- .../core/postoffice/impl/PostOfficeImpl.java | 5 + .../postoffice/impl/SimpleAddressManager.java | 15 + .../artemis/core/server/ActiveMQServer.java | 2 +- .../core/server/impl/ActiveMQServerImpl.java | 12 +- .../artemis/core/server/impl/AddressInfo.java | 12 +- .../server/impl/PostOfficeJournalLoader.java | 3 +- .../core/server/impl/QueueFactoryImpl.java | 8 + .../config/impl/FileConfigurationTest.java | 4 +- .../addressing/AddressingTest.java | 241 ++++++++++++++- .../integration/client/HangConsumerTest.java | 2 +- .../AnycastRoutingWithClusterTest.java | 276 ++++++++++++++++++ .../cluster/distribution/ClusterTestBase.java | 14 + .../jms/client/TopicCleanupTest.java | 2 +- .../server/impl/fakes/FakePostOffice.java | 5 + 18 files changed, 591 insertions(+), 24 deletions(-) create mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/AnycastRoutingWithClusterTest.java diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/AddressManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/AddressManager.java index 5519822186..1cf1a07a19 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/AddressManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/AddressManager.java @@ -54,6 +54,8 @@ public interface AddressManager { AddressInfo addAddressInfo(AddressInfo addressInfo); + AddressInfo addOrUpdateAddressInfo(AddressInfo addressInfo); + AddressInfo removeAddressInfo(SimpleString address); AddressInfo getAddressInfo(SimpleString address); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java index f7199661cd..79023520ea 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java @@ -45,6 +45,8 @@ public interface PostOffice extends ActiveMQComponent { AddressInfo addAddressInfo(AddressInfo addressInfo); + AddressInfo addOrUpdateAddressInfo(AddressInfo addressInfo); + AddressInfo removeAddressInfo(SimpleString address); AddressInfo getAddressInfo(SimpleString address); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java index e5df737c70..6be0311df0 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java @@ -262,6 +262,7 @@ public final class BindingsImpl implements Bindings { boolean routed = false; for (Binding binding : exclusiveBindings) { + if (binding.getFilter() == null || binding.getFilter().match(message)) { binding.getBindable().route(message, context); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java index 2a6d9c50be..292138823c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java @@ -24,10 +24,11 @@ import org.apache.activemq.artemis.core.server.Bindable; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.RoutingContext; import org.apache.activemq.artemis.core.server.ServerMessage; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; public class LocalQueueBinding implements QueueBinding { - private final SimpleString address; + private final AddressInfo address; private final Queue queue; @@ -37,7 +38,7 @@ public class LocalQueueBinding implements QueueBinding { private final SimpleString clusterName; - public LocalQueueBinding(final SimpleString address, final Queue queue, final SimpleString nodeID) { + public LocalQueueBinding(final AddressInfo address, final Queue queue, final SimpleString nodeID) { this.address = address; this.queue = queue; @@ -61,7 +62,7 @@ public class LocalQueueBinding implements QueueBinding { @Override public SimpleString getAddress() { - return address; + return address.getName(); } @Override @@ -76,7 +77,7 @@ public class LocalQueueBinding implements QueueBinding { @Override public SimpleString getRoutingName() { - return name; + return (address.getRoutingType() == AddressInfo.RoutingType.MULTICAST) ? name : address.getName(); } @Override diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java index 9b7ed0c45c..6c654bf3c8 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java @@ -424,6 +424,11 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding return addressManager.addAddressInfo(addressInfo); } + @Override + public AddressInfo addOrUpdateAddressInfo(AddressInfo addressInfo) { + return addressManager.addOrUpdateAddressInfo(addressInfo); + } + @Override public AddressInfo removeAddressInfo(SimpleString address) { return addressManager.removeAddressInfo(address); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java index 2994f9e1f4..969a1a916a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java @@ -187,6 +187,21 @@ public class SimpleAddressManager implements AddressManager { return addressInfoMap.putIfAbsent(addressInfo.getName(), addressInfo); } + @Override + public AddressInfo addOrUpdateAddressInfo(AddressInfo addressInfo) { + AddressInfo from = addAddressInfo(addressInfo); + return (from == null) ? addressInfo : updateAddressInfo(from, addressInfo); + } + + private AddressInfo updateAddressInfo(AddressInfo from, AddressInfo to) { + synchronized (from) { + from.setRoutingType(to.getRoutingType()); + from.setDefaultMaxConsumers(to.getDefaultMaxConsumers()); + from.setDefaultDeleteOnNoConsumers(to.isDefaultDeleteOnNoConsumers()); + return from; + } + } + @Override public AddressInfo removeAddressInfo(SimpleString address) { return addressInfoMap.remove(address); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java index ccb2ae7e05..f0e2336145 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java @@ -425,7 +425,7 @@ public interface ActiveMQServer extends ActiveMQComponent { void removeClientConnection(String clientId); - AddressInfo addAddressInfo(AddressInfo addressInfo); + AddressInfo createOrUpdateAddressInfo(AddressInfo addressInfo); AddressInfo removeAddressInfo(SimpleString address); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index 57683c5153..ede8470619 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -2163,7 +2163,7 @@ public class ActiveMQServerImpl implements ActiveMQServer { info.setDefaultDeleteOnNoConsumers(config.getDefaultDeleteOnNoConsumers()); info.setDefaultMaxConsumers(config.getDefaultMaxConsumers()); - addAddressInfo(info); + createOrUpdateAddressInfo(info); deployQueuesFromListCoreQueueConfiguration(config.getQueueConfigurations()); } } @@ -2267,8 +2267,8 @@ public class ActiveMQServerImpl implements ActiveMQServer { } @Override - public AddressInfo addAddressInfo(AddressInfo addressInfo) { - return postOffice.addAddressInfo(addressInfo); + public AddressInfo createOrUpdateAddressInfo(AddressInfo addressInfo) { + return postOffice.addOrUpdateAddressInfo(addressInfo); } @Override @@ -2278,7 +2278,7 @@ public class ActiveMQServerImpl implements ActiveMQServer { @Override public AddressInfo getAddressInfo(SimpleString address) { - return postOffice.removeAddressInfo(address); + return postOffice.getAddressInfo(address); } private Queue createQueue(final SimpleString addressName, @@ -2314,15 +2314,13 @@ public class ActiveMQServerImpl implements ActiveMQServer { final QueueConfig queueConfig = queueConfigBuilder.filter(filter).pagingManager(pagingManager).user(user).durable(durable).temporary(temporary).autoCreated(autoCreated).build(); final Queue queue = queueFactory.createQueueWith(queueConfig); - addAddressInfo(new AddressInfo(queue.getAddress())); - if (transientQueue) { queue.setConsumersRefCount(new TransientQueueManagerImpl(this, queue.getName())); } else if (queue.isAutoCreated()) { queue.setConsumersRefCount(new AutoCreatedQueueManagerImpl(this.getJMSQueueDeleter(), queue.getName())); } - final QueueBinding localQueueBinding = new LocalQueueBinding(queue.getAddress(), queue, nodeManager.getNodeId()); + final QueueBinding localQueueBinding = new LocalQueueBinding(getAddressInfo(queue.getAddress()), queue, nodeManager.getNodeId()); if (queue.isDurable()) { storageManager.addQueueBinding(txID, localQueueBinding); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java index 4c6ec1f2af..1449107201 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java @@ -22,7 +22,7 @@ public class AddressInfo { private final SimpleString name; - private RoutingType routingType = RoutingType.Multicast; + private RoutingType routingType = RoutingType.MULTICAST; private boolean defaultDeleteOnNoConsumers; @@ -61,13 +61,13 @@ public class AddressInfo { } public enum RoutingType { - Multicast, Anycast; + MULTICAST, ANYCAST; public byte getType() { switch (this) { - case Multicast: + case MULTICAST: return 0; - case Anycast: + case ANYCAST: return 1; default: return -1; @@ -77,9 +77,9 @@ public class AddressInfo { public static RoutingType getType(byte type) { switch (type) { case 0: - return Multicast; + return MULTICAST; case 1: - return Anycast; + return ANYCAST; default: return null; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java index 9a8ae74552..71c5b2bd83 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java @@ -155,7 +155,8 @@ public class PostOfficeJournalLoader implements JournalLoader { } } - final Binding binding = new LocalQueueBinding(queue.getAddress(), queue, nodeManager.getNodeId()); + final Binding binding = new LocalQueueBinding(postOffice.getAddressInfo(queue.getAddress()), queue, nodeManager.getNodeId()); + queues.put(queue.getID(), queue); postOffice.addBinding(binding); managementService.registerAddress(queue.getAddress()); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java index 5686c7beb5..367855392d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java @@ -68,6 +68,10 @@ public class QueueFactoryImpl implements QueueFactory { @Override public Queue createQueueWith(final QueueConfig config) { + + // Add default address info if one doesn't exist + postOffice.addAddressInfo(new AddressInfo(config.address())); + final AddressSettings addressSettings = addressSettingsRepository.getMatch(config.address().toString()); final Queue queue; if (addressSettings.isLastValueQueue()) { @@ -89,6 +93,10 @@ public class QueueFactoryImpl implements QueueFactory { final boolean durable, final boolean temporary, final boolean autoCreated) { + + // Add default address info if one doesn't exist + postOffice.addAddressInfo(new AddressInfo(address)); + AddressSettings addressSettings = addressSettingsRepository.getMatch(address.toString()); Queue queue; diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java index ba9cc1a554..0d53a71ae5 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java @@ -376,7 +376,7 @@ public class FileConfigurationTest extends ConfigurationImplTest { // Addr 1 CoreAddressConfiguration addressConfiguration = conf.getAddressConfigurations().get(0); assertEquals("addr1", addressConfiguration.getName()); - assertEquals(AddressInfo.RoutingType.Anycast, addressConfiguration.getRoutingType()); + assertEquals(AddressInfo.RoutingType.ANYCAST, addressConfiguration.getRoutingType()); assertEquals(2, addressConfiguration.getQueueConfigurations().size()); // Addr 1 Queue 1 @@ -402,7 +402,7 @@ public class FileConfigurationTest extends ConfigurationImplTest { // Addr 2 addressConfiguration = conf.getAddressConfigurations().get(1); assertEquals("addr2", addressConfiguration.getName()); - assertEquals(AddressInfo.RoutingType.Multicast, addressConfiguration.getRoutingType()); + assertEquals(AddressInfo.RoutingType.MULTICAST, addressConfiguration.getRoutingType()); assertEquals(2, addressConfiguration.getQueueConfigurations().size()); // Addr 2 Queue 1 diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AddressingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AddressingTest.java index 43d60719df..03739e9ac9 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AddressingTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AddressingTest.java @@ -16,6 +16,245 @@ */ package org.apache.activemq.artemis.tests.integration.addressing; -public class AddressingTest { +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.TimeUnit; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.TransportConfiguration; +import org.apache.activemq.artemis.api.core.client.ActiveMQClient; +import org.apache.activemq.artemis.api.core.client.ClientConsumer; +import org.apache.activemq.artemis.api.core.client.ClientMessage; +import org.apache.activemq.artemis.api.core.client.ClientProducer; +import org.apache.activemq.artemis.api.core.client.ClientSession; +import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; +import org.apache.activemq.artemis.api.core.client.ServerLocator; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; + +public class AddressingTest extends ActiveMQTestBase { + + private ActiveMQServer server; + + private ClientSessionFactory sessionFactory; + + @Before + public void setup() throws Exception { + server = createServer(true); + server.start(); + + server.waitForActivation(10, TimeUnit.SECONDS); + + ServerLocator sl = ActiveMQClient.createServerLocatorWithoutHA(new TransportConfiguration(INVM_CONNECTOR_FACTORY)); + sessionFactory = sl.createSessionFactory(); + + addSessionFactory(sessionFactory); + } + + @Test + public void testMulticastRouting() throws Exception { + + SimpleString sendAddress = new SimpleString("test.address"); + + List testAddresses = Arrays.asList("test.address", "test.#", "test.*"); + + for (String consumeAddress : testAddresses) { + + // For each address, create 2 Queues with the same address, assert both queues receive message + + AddressInfo addressInfo = new AddressInfo(new SimpleString(consumeAddress)); + addressInfo.setRoutingType(AddressInfo.RoutingType.MULTICAST); + + server.createOrUpdateAddressInfo(addressInfo); + Queue q1 = server.createQueue(new SimpleString(consumeAddress), new SimpleString(consumeAddress + ".1"), null, true, false); + Queue q2 = server.createQueue(new SimpleString(consumeAddress), new SimpleString(consumeAddress + ".2"), null, true, false); + + ClientSession session = sessionFactory.createSession(); + session.start(); + + ClientConsumer consumer1 = session.createConsumer(q1.getName()); + ClientConsumer consumer2 = session.createConsumer(q2.getName()); + + ClientProducer producer = session.createProducer(sendAddress); + ClientMessage m = session.createMessage(ClientMessage.TEXT_TYPE, true); + m.getBodyBuffer().writeString("TestMessage"); + + producer.send(m); + + assertNotNull(consumer1.receive(2000)); + assertNotNull(consumer2.receive(2000)); + + q1.deleteQueue(); + q2.deleteQueue(); + } + } + + @Test + public void testAnycastRouting() throws Exception { + + SimpleString sendAddress = new SimpleString("test.address"); + + List testAddresses = Arrays.asList("test.address", "test.#", "test.*"); + + for (String consumeAddress : testAddresses) { + + // For each address, create 2 Queues with the same address, assert one queue receive message + + AddressInfo addressInfo = new AddressInfo(new SimpleString(consumeAddress)); + addressInfo.setRoutingType(AddressInfo.RoutingType.ANYCAST); + + server.createOrUpdateAddressInfo(addressInfo); + Queue q1 = server.createQueue(new SimpleString(consumeAddress), new SimpleString(consumeAddress + ".1"), null, true, false); + Queue q2 = server.createQueue(new SimpleString(consumeAddress), new SimpleString(consumeAddress + ".2"), null, true, false); + + ClientSession session = sessionFactory.createSession(); + session.start(); + + ClientConsumer consumer1 = session.createConsumer(q1.getName()); + ClientConsumer consumer2 = session.createConsumer(q2.getName()); + + ClientProducer producer = session.createProducer(sendAddress); + ClientMessage m = session.createMessage(ClientMessage.TEXT_TYPE, true); + + m.getBodyBuffer().writeString("TestMessage"); + + producer.send(m); + + int count = 0; + count = (consumer1.receive(1000) == null) ? count : count + 1; + count = (consumer2.receive(1000) == null) ? count : count + 1; + assertEquals(1, count); + + q1.deleteQueue(); + q2.deleteQueue(); + } + } + + @Test + public void testAnycastRoutingRoundRobin() throws Exception { + + SimpleString address = new SimpleString("test.address"); + AddressInfo addressInfo = new AddressInfo(address); + addressInfo.setRoutingType(AddressInfo.RoutingType.ANYCAST); + + server.createOrUpdateAddressInfo(addressInfo); + Queue q1 = server.createQueue(address, address.concat(".1"), null, true, false); + Queue q2 = server.createQueue(address, address.concat(".2"), null, true, false); + Queue q3 = server.createQueue(address, address.concat(".3"), null, true, false); + + ClientSession session = sessionFactory.createSession(); + session.start(); + + ClientProducer producer = session.createProducer(address); + + ClientConsumer consumer1 = session.createConsumer(q1.getName()); + ClientConsumer consumer2 = session.createConsumer(q2.getName()); + ClientConsumer consumer3 = session.createConsumer(q3.getName()); + List consumers = new ArrayList<>(Arrays.asList(new ClientConsumer[] {consumer1, consumer2, consumer3})); + + List messages = new ArrayList<>(); + messages.add("Message1"); + messages.add("Message2"); + messages.add("Message3"); + + ClientMessage clientMessage; + for (String message : messages) { + clientMessage = session.createMessage(true); + clientMessage.getBodyBuffer().writeString(message); + producer.send(clientMessage); + } + + String m; + for (ClientConsumer consumer : consumers) { + clientMessage = consumer.receive(1000); + m = clientMessage.getBodyBuffer().readString(); + messages.remove(m); + } + + assertTrue(messages.isEmpty()); + + // Check we don't receive more messages + int count = 0; + for (ClientConsumer consumer : consumers) { + count = (consumer.receive(1000) == null) ? count : count + 1; + } + assertEquals(0, count); + } + + + + @Test + public void testMulticastRoutingBackwardsCompat() throws Exception { + + SimpleString sendAddress = new SimpleString("test.address"); + + List testAddresses = Arrays.asList("test.address", "test.#", "test.*"); + + for (String consumeAddress : testAddresses) { + + // For each address, create 2 Queues with the same address, assert both queues receive message + Queue q1 = server.createQueue(new SimpleString(consumeAddress), new SimpleString(consumeAddress + ".1"), null, true, false); + Queue q2 = server.createQueue(new SimpleString(consumeAddress), new SimpleString(consumeAddress + ".2"), null, true, false); + + ClientSession session = sessionFactory.createSession(); + session.start(); + + ClientConsumer consumer1 = session.createConsumer(q1.getName()); + ClientConsumer consumer2 = session.createConsumer(q2.getName()); + + ClientProducer producer = session.createProducer(sendAddress); + ClientMessage m = session.createMessage(ClientMessage.TEXT_TYPE, true); + m.getBodyBuffer().writeString("TestMessage"); + + producer.send(m); + + assertNotNull(consumer1.receive(2000)); + assertNotNull(consumer2.receive(2000)); + + q1.deleteQueue(); + q2.deleteQueue(); + } + } + + @Ignore + @Test + public void testDeleteQueueOnNoConsumersTrue() { + fail("Not Implemented"); + } + + @Ignore + @Test + public void testDeleteQueueOnNoConsumersFalse() { + fail("Not Implemented"); + } + + @Ignore + @Test + public void testLimitOnMaxConsumers() { + fail("Not Implemented"); + } + + @Ignore + @Test + public void testUnlimitedMaxConsumers() { + fail("Not Implemented"); + } + + @Ignore + @Test + public void testDefaultMaxConsumersFromAddress() { + fail("Not Implemented"); + } + + @Ignore + @Test + public void testDefaultDeleteOnNoConsumersFromAddress() { + fail("Not Implemented"); + } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java index 83d28a12d5..2fd5915549 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java @@ -353,7 +353,7 @@ public class HangConsumerTest extends ActiveMQTestBase { long txID = server.getStorageManager().generateID(); // Forcing a situation where the server would unexpectedly create a duplicated queue. The server should still start normally - LocalQueueBinding newBinding = new LocalQueueBinding(QUEUE, new QueueImpl(queueID, QUEUE, QUEUE, null, null, true, false, false, null, null, null, null, null), server.getNodeID()); + LocalQueueBinding newBinding = new LocalQueueBinding(server.getAddressInfo(QUEUE), new QueueImpl(queueID, QUEUE, QUEUE, null, null, true, false, false, null, null, null, null, null), server.getNodeID()); server.getStorageManager().addQueueBinding(txID, newBinding); server.getStorageManager().commitBindings(txID); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/AnycastRoutingWithClusterTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/AnycastRoutingWithClusterTest.java new file mode 100644 index 0000000000..f413113e37 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/AnycastRoutingWithClusterTest.java @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.cluster.distribution; + +import java.util.List; + +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; +import org.apache.activemq.artemis.core.server.group.impl.GroupingHandlerConfiguration; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; +import org.apache.activemq.artemis.tests.util.Wait; +import org.junit.Test; + +public class AnycastRoutingWithClusterTest extends ClusterTestBase { + + /** + * Test anycast address with single distributed queue in a 3 node cluster environment. Messages should be + * "round robin"'d across the each queue + * @throws Exception + */ + @Test + public void testAnycastAddressOneQueueRoutingMultiNode() throws Exception { + String address = "test.address"; + String queueName = "test.queue"; + String clusterAddress = "test"; + + for (int i = 0; i < 3; i++) { + setupServer(i, isFileStorage(), isNetty()); + } + + setupClusterConnection("cluster0", clusterAddress, MessageLoadBalancingType.STRICT, 1, isNetty(), 0, 1, 2); + setupClusterConnection("cluster1", clusterAddress, MessageLoadBalancingType.STRICT, 1, isNetty(), 1, 0, 2); + setupClusterConnection("cluster2", clusterAddress, MessageLoadBalancingType.STRICT, 1, isNetty(), 2, 0, 1); + + setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0); + setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1); + setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2); + + startServers(0, 1, 2); + + List queues; + for (int i = 0; i < 3; i++) { + createAddressInfo(i, address, AddressInfo.RoutingType.ANYCAST, -1, false); + setupSessionFactory(i, isNetty()); + createQueue(i, address, queueName, null, false); + addConsumer(i, i, queueName, null); + } + + for (int i = 0; i < 3; i++) { + waitForBindings(i, address, 1, 1, true); + waitForBindings(i, address, 2, 2, false); + } + + final int noMessages = 30; + send(0, address, noMessages, true, null, null); + + for (int s = 0; s < 3; s++) { + final Queue queue = servers[s].locateQueue(new SimpleString(queueName)); + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisfied() throws Exception { + return queue.getMessageCount() == noMessages / 3; + } + }); + } + + // Each consumer should receive noMessages / noServers + for (int i = 0; i < noMessages / 3; i++) { + for (int c = 0; c < 3; c++) { + assertNotNull(consumers[c].consumer.receive(1000)); + } + } + } + + + /** + * Test anycast address with N queues in a 3 node cluster environment. Messages should be "round robin"'d across the + * each queue. + * @throws Exception + */ + @Test + public void testAnycastAddressMultiQueuesRoutingMultiNode() throws Exception { + + String address = "test.address"; + String queueNamePrefix = "test.queue"; + String clusterAddress = "test"; + + for (int i = 0; i < 3; i++) { + setupServer(i, isFileStorage(), isNetty()); + } + + setupClusterConnection("cluster0", clusterAddress, MessageLoadBalancingType.STRICT, 1, isNetty(), 0, 1, 2); + setupClusterConnection("cluster1", clusterAddress, MessageLoadBalancingType.STRICT, 1, isNetty(), 1, 0, 2); + setupClusterConnection("cluster2", clusterAddress, MessageLoadBalancingType.STRICT, 1, isNetty(), 2, 0, 1); + + setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0); + setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1); + setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2); + + startServers(0, 1, 2); + + List queues; + for (int i = 0; i < 3; i++) { + createAddressInfo(i, address, AddressInfo.RoutingType.ANYCAST, -1, false); + setupSessionFactory(i, isNetty()); + createQueue(i, address, queueNamePrefix + i, null, false); + addConsumer(i, i, queueNamePrefix + i, null); + } + + for (int i = 0; i < 3; i++) { + waitForBindings(i, address, 1, 1, true); + waitForBindings(i, address, 2, 2, false); + } + + final int noMessages = 30; + send(0, address, noMessages, true, null, null); + + for (int s = 0; s < 3; s++) { + final Queue queue = servers[s].locateQueue(new SimpleString(queueNamePrefix + s)); + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisfied() throws Exception { + return queue.getMessageCount() == noMessages / 3; + } + }); + } + + // Each consumer should receive noMessages / noServers + for (int i = 0; i < noMessages / 3; i++) { + for (int c = 0; c < 3; c++) { + assertNotNull(consumers[c].consumer.receive(1000)); + } + } + } + + /** + * Test anycast address with N queues in a 3 node cluster environment. Messages should be "round robin"'d across the + * each queue. + * @throws Exception + */ + @Test + public void testAnycastAddressMultiQueuesWithFilterRoutingMultiNode() throws Exception { + + String address = "test.address"; + String queueNamePrefix = "test.queue"; + String clusterAddress = "test"; + + for (int i = 0; i < 3; i++) { + setupServer(i, isFileStorage(), isNetty()); + } + + setupClusterConnection("cluster0", clusterAddress, MessageLoadBalancingType.STRICT, 1, isNetty(), 0, 1, 2); + setupClusterConnection("cluster1", clusterAddress, MessageLoadBalancingType.STRICT, 1, isNetty(), 1, 0, 2); + setupClusterConnection("cluster2", clusterAddress, MessageLoadBalancingType.STRICT, 1, isNetty(), 2, 0, 1); + + setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0); + setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1); + setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2); + + startServers(0, 1, 2); + + List queues; + for (int i = 0; i < 3; i++) { + createAddressInfo(i, address, AddressInfo.RoutingType.ANYCAST, -1, false); + setupSessionFactory(i, isNetty()); + + } + + String filter1 = "giraffe"; + String filter2 = "platypus"; + + createQueue(0, address, queueNamePrefix + 0, filter1, false); + createQueue(1, address, queueNamePrefix + 1, filter1, false); + createQueue(2, address, queueNamePrefix + 2, filter2, false); + + for (int i = 0; i < 3; i++) { + addConsumer(i, i, queueNamePrefix + i, null); + } + + for (int i = 0; i < 3; i++) { + waitForBindings(i, address, 1, 1, true); + waitForBindings(i, address, 2, 2, false); + } + + final int noMessages = 30; + send(0, address, noMessages, true, filter1, null); + + // Each consumer should receive noMessages / noServers + for (int i = 0; i < noMessages / 2; i++) { + for (int c = 0; c < 2; c++) { + assertNotNull(consumers[c].consumer.receive(1000)); + } + } + + assertNull(consumers[2].consumer.receive(1000)); + } + + /** + * Test multicast address that with N queues in a 3 node cluster environment. Each queue should receive all messages + * sent from the client. + * @throws Exception + */ + @Test + public void testMulitcastAddressMultiQueuesRoutingMultiNode() throws Exception { + + String address = "test.address"; + String queueNamePrefix = "test.queue"; + String clusterAddress = "test"; + + for (int i = 0; i < 3; i++) { + setupServer(i, isFileStorage(), isNetty()); + } + + setupClusterConnection("cluster0", clusterAddress, MessageLoadBalancingType.STRICT, 1, isNetty(), 0, 1, 2); + setupClusterConnection("cluster1", clusterAddress, MessageLoadBalancingType.STRICT, 1, isNetty(), 1, 0, 2); + setupClusterConnection("cluster2", clusterAddress, MessageLoadBalancingType.STRICT, 1, isNetty(), 2, 0, 1); + + setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0); + setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1); + setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2); + + startServers(0, 1, 2); + + List queues; + for (int i = 0; i < 3; i++) { + createAddressInfo(i, address, AddressInfo.RoutingType.MULTICAST, -1, false); + setupSessionFactory(i, isNetty()); + createQueue(i, address, queueNamePrefix + i, null, false); + addConsumer(i, i, queueNamePrefix + i, null); + } + + for (int i = 0; i < 3; i++) { + waitForBindings(i, address, 1, 1, true); + waitForBindings(i, address, 2, 2, false); + } + + final int noMessages = 30; + send(0, address, noMessages, true, null, null); + + for (int s = 0; s < 3; s++) { + final Queue queue = servers[s].locateQueue(new SimpleString(queueNamePrefix + s)); + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisfied() throws Exception { + return queue.getMessageCount() == noMessages; + } + }); + } + + // Each consumer should receive noMessages + for (int i = 0; i < noMessages; i++) { + for (int c = 0; c < 3; c++) { + assertNotNull(consumers[c].consumer.receive(1000)); + } + } + } + + private boolean isNetty() { + return true; + } +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java index 538779ffe7..2623e9c699 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java @@ -78,6 +78,7 @@ import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancing import org.apache.activemq.artemis.core.server.cluster.qourum.SharedNothingBackupQuorum; import org.apache.activemq.artemis.core.server.group.GroupingHandler; import org.apache.activemq.artemis.core.server.group.impl.GroupingHandlerConfiguration; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.impl.InVMNodeManager; import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; @@ -518,6 +519,19 @@ public abstract class ClusterTestBase extends ActiveMQTestBase { session.close(); } + protected void createAddressInfo(final int node, + final String address, + final AddressInfo.RoutingType routingType, + final int defaulMaxConsumers, + boolean defaultDeleteOnNoConsumers) { + AddressInfo addressInfo = new AddressInfo(new SimpleString(address)); + addressInfo.setRoutingType(routingType); + addressInfo.setDefaultMaxConsumers(defaulMaxConsumers); + addressInfo.setDefaultDeleteOnNoConsumers(defaultDeleteOnNoConsumers); + + servers[node].createOrUpdateAddressInfo(addressInfo); + } + protected void deleteQueue(final int node, final String queueName) throws Exception { ClientSessionFactory sf = sfs[node]; diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/TopicCleanupTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/TopicCleanupTest.java index 280596aadc..ec279eea1f 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/TopicCleanupTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/TopicCleanupTest.java @@ -83,7 +83,7 @@ public class TopicCleanupTest extends JMSTestBase { final Queue queue = new QueueImpl(storage.generateID(), SimpleString.toSimpleString("jms.topic.topic"), SimpleString.toSimpleString("jms.topic.topic"), FilterImpl.createFilter(ActiveMQServerImpl.GENERIC_IGNORED_FILTER), null, true, false, false, server.getScheduledPool(), server.getPostOffice(), storage, server.getAddressSettingsRepository(), server.getExecutorFactory().getExecutor()); - LocalQueueBinding binding = new LocalQueueBinding(queue.getAddress(), queue, server.getNodeID()); + LocalQueueBinding binding = new LocalQueueBinding(server.getAddressInfo(queue.getAddress()), queue, server.getNodeID()); storage.addQueueBinding(txid, binding); diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java index 9424fc3048..512f0f2bca 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java @@ -65,6 +65,11 @@ public class FakePostOffice implements PostOffice { return null; } + @Override + public AddressInfo addOrUpdateAddressInfo(AddressInfo addressInfo) { + return null; + } + @Override public AddressInfo removeAddressInfo(SimpleString address) {