From 4b88f38b2df6b3c401542884ffe97c8a49069a5c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michael=20Andr=C3=A9=20Pearce?= Date: Fri, 7 Sep 2018 06:05:33 +0100 Subject: [PATCH] ARTEMIS-2076 Make Filter update-able Add Tests Add implementation inline with other queue updatable settings. Enhance tests to ensure queue is not destroyed during config change and messages in queue already are preserved --- .../management/ActiveMQServerControl.java | 1 + .../impl/ActiveMQServerControlImpl.java | 5 +- .../artemis/core/postoffice/PostOffice.java | 2 + .../postoffice/impl/LocalQueueBinding.java | 8 +- .../core/postoffice/impl/PostOfficeImpl.java | 5 + .../artemis/core/server/ActiveMQServer.java | 1 + .../activemq/artemis/core/server/Queue.java | 2 + .../core/server/impl/ActiveMQServerImpl.java | 8 +- .../artemis/core/server/impl/QueueImpl.java | 10 +- .../impl/ScheduledDeliveryHandlerTest.java | 4 + .../tests/integration/jms/RedeployTest.java | 97 +++++++++++++++++++ .../ActiveMQServerControlUsingCoreTest.java | 12 ++- .../persistence/ConfigChangeTest.java | 59 +++++++++++ .../resources/reload-queue-filter-updated.xml | 42 ++++++++ .../test/resources/reload-queue-filter.xml | 42 ++++++++ .../unit/core/postoffice/impl/FakeQueue.java | 5 + .../server/impl/fakes/FakePostOffice.java | 2 + 17 files changed, 290 insertions(+), 15 deletions(-) create mode 100644 tests/integration-tests/src/test/resources/reload-queue-filter-updated.xml create mode 100644 tests/integration-tests/src/test/resources/reload-queue-filter.xml diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java index 6ce945c411..5719fb6698 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java @@ -705,6 +705,7 @@ public interface ActiveMQServerControl { @Operation(desc = "Update a queue", impact = MBeanOperationInfo.ACTION) String updateQueue(@Parameter(name = "name", desc = "Name of the queue") String name, @Parameter(name = "routingType", desc = "The routing type used for this address, MULTICAST or ANYCAST") String routingType, + @Parameter(name = "filter", desc = "The filter to use on the queue") String filter, @Parameter(name = "maxConsumers", desc = "The maximum number of consumers allowed on this queue at any one time") Integer maxConsumers, @Parameter(name = "purgeOnNoConsumers", desc = "Delete this queue when the last consumer disconnects") Boolean purgeOnNoConsumers, @Parameter(name = "exclusive", desc = "If the queue should route exclusively to one consumer") Boolean exclusive, diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java index f708c1dfb6..ec6b0dfd88 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java @@ -868,12 +868,13 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active Boolean purgeOnNoConsumers, Boolean exclusive, String user) throws Exception { - return updateQueue(name, routingType, maxConsumers, purgeOnNoConsumers, exclusive, null, null, user); + return updateQueue(name, routingType, null, maxConsumers, purgeOnNoConsumers, exclusive, null, null, user); } @Override public String updateQueue(String name, String routingType, + String filter, Integer maxConsumers, Boolean purgeOnNoConsumers, Boolean exclusive, @@ -885,7 +886,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active clearIO(); try { - final Queue queue = server.updateQueue(name, routingType != null ? RoutingType.valueOf(routingType) : null, maxConsumers, purgeOnNoConsumers, exclusive, consumersBeforeDispatch, delayBeforeDispatch, user); + final Queue queue = server.updateQueue(name, routingType != null ? RoutingType.valueOf(routingType) : null, filter, maxConsumers, purgeOnNoConsumers, exclusive, consumersBeforeDispatch, delayBeforeDispatch, user); if (queue == null) { throw ActiveMQMessageBundle.BUNDLE.noSuchQueue(new SimpleString(name)); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java index ce1fcfd172..d31e33b4ce 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java @@ -24,6 +24,7 @@ import java.util.Set; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.server.ActiveMQComponent; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.Queue; @@ -66,6 +67,7 @@ public interface PostOffice extends ActiveMQComponent { QueueBinding updateQueue(SimpleString name, RoutingType routingType, + Filter filter, Integer maxConsumers, Boolean purgeOnNoConsumers, Boolean exclusive, diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java index 176d614071..79af5d075d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java @@ -32,8 +32,6 @@ public class LocalQueueBinding implements QueueBinding { private final Queue queue; - private final Filter filter; - private final SimpleString clusterName; private SimpleString name; @@ -45,8 +43,6 @@ public class LocalQueueBinding implements QueueBinding { this.name = queue.getName(); - filter = queue.getFilter(); - clusterName = queue.getName().concat(nodeID); } @@ -57,7 +53,7 @@ public class LocalQueueBinding implements QueueBinding { @Override public Filter getFilter() { - return filter; + return queue.getFilter(); } @Override @@ -158,7 +154,7 @@ public class LocalQueueBinding implements QueueBinding { ", queue=" + queue + ", filter=" + - filter + + getFilter() + ", name=" + name + ", clusterName=" + diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java index 598c32b46b..d4cde18793 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java @@ -465,6 +465,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding @Override public QueueBinding updateQueue(SimpleString name, RoutingType routingType, + Filter filter, Integer maxConsumers, Boolean purgeOnNoConsumers, Boolean exclusive, @@ -522,6 +523,10 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding changed = true; queue.setDelayBeforeDispatch(delayBeforeDispatch.longValue()); } + if (filter != null && !filter.equals(queue.getFilter())) { + changed = true; + queue.setFilter(filter); + } if (logger.isDebugEnabled()) { if (user == null && queue.getUser() != null) { logger.debug("Ignoring updating Queue to a NULL user"); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java index 1883362eaf..488c6fdede 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java @@ -537,6 +537,7 @@ public interface ActiveMQServer extends ServiceComponent { Queue updateQueue(String name, RoutingType routingType, + String filterString, Integer maxConsumers, Boolean purgeOnNoConsumers, Boolean exclusive, diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java index 70051c0cb9..63d39c7c16 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java @@ -45,6 +45,8 @@ public interface Queue extends Bindable,CriticalComponent { Filter getFilter(); + void setFilter(Filter filter); + PageSubscription getPageSubscription(); RoutingType getRoutingType(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index 498f4e9def..dcb6e02d75 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -2790,7 +2790,7 @@ public class ActiveMQServerImpl implements ActiveMQServer { long delayBeforeDispatch = config.getDelayBeforeDispatch() == null ? as.getDefaultDelayBeforeDispatch() : config.getDelayBeforeDispatch(); if (locateQueue(queueName) != null && locateQueue(queueName).getAddress().toString().equals(config.getAddress())) { - updateQueue(config.getName(), config.getRoutingType(), maxConsumers, config.getPurgeOnNoConsumers(), isExclusive, consumersBeforeDispatch, delayBeforeDispatch, config.getUser()); + updateQueue(config.getName(), config.getRoutingType(), config.getFilterString(), maxConsumers, config.getPurgeOnNoConsumers(), isExclusive, consumersBeforeDispatch, delayBeforeDispatch, config.getUser()); } else { // if the address::queue doesn't exist then create it try { @@ -3252,19 +3252,21 @@ public class ActiveMQServerImpl implements ActiveMQServer { Boolean purgeOnNoConsumers, Boolean exclusive, String user) throws Exception { - return updateQueue(name, routingType, maxConsumers, purgeOnNoConsumers, exclusive, null, null, user); + return updateQueue(name, routingType, null, maxConsumers, purgeOnNoConsumers, exclusive, null, null, user); } @Override public Queue updateQueue(String name, RoutingType routingType, + String filterString, Integer maxConsumers, Boolean purgeOnNoConsumers, Boolean exclusive, Integer consumersBeforeDispatch, Long delayBeforeDispatch, String user) throws Exception { - final QueueBinding queueBinding = this.postOffice.updateQueue(new SimpleString(name), routingType, maxConsumers, purgeOnNoConsumers, exclusive, consumersBeforeDispatch, delayBeforeDispatch, SimpleString.toSimpleString(user)); + final Filter filter = FilterImpl.createFilter(filterString); + final QueueBinding queueBinding = this.postOffice.updateQueue(new SimpleString(name), routingType, filter, maxConsumers, purgeOnNoConsumers, exclusive, consumersBeforeDispatch, delayBeforeDispatch, SimpleString.toSimpleString(user)); if (queueBinding != null) { final Queue queue = queueBinding.getQueue(); return queue; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index 8af8aaa763..2891350e6a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -40,8 +40,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicLong; - import java.util.concurrent.atomic.AtomicLongFieldUpdater; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQNullRefException; @@ -119,6 +119,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { private static final Logger logger = Logger.getLogger(QueueImpl.class); private static final AtomicIntegerFieldUpdater dispatchingUpdater = AtomicIntegerFieldUpdater.newUpdater(QueueImpl.class, "dispatching"); private static final AtomicLongFieldUpdater dispatchStartTimeUpdater = AtomicLongFieldUpdater.newUpdater(QueueImpl.class, "dispatchStartTime"); + private static final AtomicReferenceFieldUpdater filterUpdater = AtomicReferenceFieldUpdater.newUpdater(QueueImpl.class, Filter.class, "filter"); public static final int REDISTRIBUTOR_BATCH_SIZE = 100; @@ -695,7 +696,12 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { @Override public Filter getFilter() { - return filter; + return filterUpdater.get(this); + } + + @Override + public void setFilter(Filter filter) { + filterUpdater.set(this, filter); } @Override diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java index 96de8c761b..b21781d2ab 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java @@ -882,6 +882,10 @@ public class ScheduledDeliveryHandlerTest extends Assert { return null; } + @Override + public void setFilter(Filter filter) { + } + @Override public PageSubscription getPageSubscription() { return null; diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/RedeployTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/RedeployTest.java index ead96d5353..600eead68b 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/RedeployTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/RedeployTest.java @@ -38,6 +38,7 @@ import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.postoffice.Binding; import org.apache.activemq.artemis.core.postoffice.QueueBinding; import org.apache.activemq.artemis.core.security.Role; import org.apache.activemq.artemis.core.server.embedded.EmbeddedActiveMQ; @@ -106,6 +107,102 @@ public class RedeployTest extends ActiveMQTestBase { } } + @Test + public void testRedeployFilter() throws Exception { + Path brokerXML = getTestDirfile().toPath().resolve("broker.xml"); + URL url1 = RedeployTest.class.getClassLoader().getResource("reload-queue-filter.xml"); + URL url2 = RedeployTest.class.getClassLoader().getResource("reload-queue-filter-updated.xml"); + Files.copy(url1.openStream(), brokerXML); + + EmbeddedActiveMQ embeddedActiveMQ = new EmbeddedActiveMQ(); + embeddedActiveMQ.setConfigResourcePath(brokerXML.toUri().toString()); + embeddedActiveMQ.start(); + + final ReusableLatch latch = new ReusableLatch(1); + + Runnable tick = new Runnable() { + @Override + public void run() { + latch.countDown(); + } + }; + + embeddedActiveMQ.getActiveMQServer().getReloadManager().setTick(tick); + + try { + latch.await(10, TimeUnit.SECONDS); + + try (ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(); + Connection connection = factory.createConnection(); + Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE)) { + connection.start(); + Queue queue = session.createQueue("myQueue"); + MessageProducer producer = session.createProducer(queue); + Message message = session.createMessage(); + message.setStringProperty("x", "x"); + producer.send(message); + MessageConsumer consumer = session.createConsumer(queue); + assertNotNull(consumer.receive(5000)); + consumer.close(); + } + + //Send a message that should remain in the queue (this ensures config change is non-destructive) + try (ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(); + Connection connection = factory.createConnection(); + Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE)) { + connection.start(); + Queue queue = session.createQueue("myQueue"); + MessageProducer producer = session.createProducer(queue); + Message message = session.createTextMessage("hello"); + message.setStringProperty("x", "x"); + producer.send(message); + } + + Binding binding = embeddedActiveMQ.getActiveMQServer().getPostOffice().getBinding(SimpleString.toSimpleString("myQueue")); + + Files.copy(url2.openStream(), brokerXML, StandardCopyOption.REPLACE_EXISTING); + brokerXML.toFile().setLastModified(System.currentTimeMillis() + 1000); + latch.setCount(1); + embeddedActiveMQ.getActiveMQServer().getReloadManager().setTick(tick); + latch.await(10, TimeUnit.SECONDS); + + Binding bindingAfterChange = embeddedActiveMQ.getActiveMQServer().getPostOffice().getBinding(SimpleString.toSimpleString("myQueue")); + + assertTrue("Instance should be the same (as should be non destructive)", binding == bindingAfterChange); + assertEquals(binding.getID(), bindingAfterChange.getID()); + + //Check that after the config change we can still consume a message that was sent before, ensuring config change was non-destructive of the queue. + try (ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(); + Connection connection = factory.createConnection(); + Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE)) { + connection.start(); + Queue queue = session.createQueue("myQueue"); + MessageConsumer consumer = session.createConsumer(queue); + Message message = consumer.receive(5000); + assertNotNull(message); + assertEquals("hello", ((TextMessage)message).getText()); + consumer.close(); + } + + try (ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(); + Connection connection = factory.createConnection(); + Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE)) { + connection.start(); + Queue queue = session.createQueue("myQueue"); + MessageProducer producer = session.createProducer(queue); + Message message = session.createMessage(); + message.setStringProperty("x", "y"); + producer.send(message); + MessageConsumer consumer = session.createConsumer(queue); + assertNotNull(consumer.receive(2000)); + consumer.close(); + } + + } finally { + embeddedActiveMQ.stop(); + } + } + @Test public void testRedeployWithFailover() throws Exception { EmbeddedActiveMQ live = new EmbeddedActiveMQ(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java index 2d78092a51..f1e7051bd0 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java @@ -156,8 +156,16 @@ public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTes } @Override - public String updateQueue(String name, String routingType, Integer maxConsumers, Boolean purgeOnNoConsumers, Boolean exclusive, Integer consumersBeforeDispatch, Long delayBeforeDispatch, String user) throws Exception { - return null; + public String updateQueue(@Parameter(name = "name", desc = "Name of the queue") String name, + @Parameter(name = "routingType", desc = "The routing type used for this address, MULTICAST or ANYCAST") String routingType, + @Parameter(name = "filter", desc = "The filter to use on the queue") String filter, + @Parameter(name = "maxConsumers", desc = "The maximum number of consumers allowed on this queue at any one time") Integer maxConsumers, + @Parameter(name = "purgeOnNoConsumers", desc = "Delete this queue when the last consumer disconnects") Boolean purgeOnNoConsumers, + @Parameter(name = "exclusive", desc = "If the queue should route exclusively to one consumer") Boolean exclusive, + @Parameter(name = "consumersBeforeDispatch", desc = "Number of consumers needed before dispatch can start") Integer consumersBeforeDispatch, + @Parameter(name = "delayBeforeDispatch", desc = "Delay to wait before dispatching if number of consumers before dispatch is not met") Long delayBeforeDispatch, + @Parameter(name = "user", desc = "The user associated with this queue") String user) throws Exception { + return (String) proxy.invokeOperation("updateQueue", name, routingType, filter, maxConsumers, purgeOnNoConsumers, exclusive, consumersBeforeDispatch, delayBeforeDispatch, user); } @Override diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/ConfigChangeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/ConfigChangeTest.java index fefc735d9b..cc6a6f6484 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/ConfigChangeTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/ConfigChangeTest.java @@ -88,4 +88,63 @@ public class ConfigChangeTest extends ActiveMQTestBase { server.stop(); } + + @Test + public void testChangeQueueFilterOnRestart() throws Exception { + final String filter1 = "x = 'x'"; + final String filter2 = "x = 'y'"; + + Configuration configuration = createDefaultInVMConfig( ); + configuration.addAddressesSetting("#", new AddressSettings()); + + List addressConfigurations = new ArrayList(); + CoreAddressConfiguration addressConfiguration = new CoreAddressConfiguration() + .setName("myAddress") + .addRoutingType(RoutingType.ANYCAST) + .addQueueConfiguration(new CoreQueueConfiguration() + .setName("myQueue") + .setAddress("myAddress") + .setFilterString(filter1) + .setRoutingType(RoutingType.ANYCAST)); + addressConfigurations.add(addressConfiguration); + configuration.setAddressConfigurations(addressConfigurations); + server = createServer(true, configuration); + server.start(); + + ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://0"); + try (JMSContext context = connectionFactory.createContext()) { + context.createProducer().setProperty("x", "x").send(context.createQueue("myAddress"), "hello"); + } + + long originalBindingId = server.getPostOffice().getBinding(SimpleString.toSimpleString("myQueue")).getID(); + + server.stop(); + + addressConfiguration = new CoreAddressConfiguration() + .setName("myAddress") + .addRoutingType(RoutingType.ANYCAST) + .addQueueConfiguration(new CoreQueueConfiguration() + .setName("myQueue") + .setAddress("myAddress") + .setFilterString(filter2) + .setRoutingType(RoutingType.ANYCAST)); + addressConfigurations.clear(); + addressConfigurations.add(addressConfiguration); + configuration.setAddressConfigurations(addressConfigurations); + + server.start(); + assertEquals(filter2, server.locateQueue(SimpleString.toSimpleString("myQueue")).getFilter().getFilterString().toString()); + + //Ensures the queue is not destroyed by checking message sent before change is consumable after (e.g. no message loss) + try (JMSContext context = connectionFactory.createContext()) { + Message message = context.createConsumer(context.createQueue("myAddress::myQueue")).receive(); + assertEquals("hello", ((TextMessage) message).getText()); + } + + long bindingId = server.getPostOffice().getBinding(SimpleString.toSimpleString("myQueue")).getID(); + assertEquals("Ensure the original queue is not destroyed by checking the binding id is the same", originalBindingId, bindingId); + + server.stop(); + + } } diff --git a/tests/integration-tests/src/test/resources/reload-queue-filter-updated.xml b/tests/integration-tests/src/test/resources/reload-queue-filter-updated.xml new file mode 100644 index 0000000000..37ef67d388 --- /dev/null +++ b/tests/integration-tests/src/test/resources/reload-queue-filter-updated.xml @@ -0,0 +1,42 @@ + + + + + + + false + + + tcp://0.0.0.0:61616 + + + +
+ + + + + +
+
+
+
diff --git a/tests/integration-tests/src/test/resources/reload-queue-filter.xml b/tests/integration-tests/src/test/resources/reload-queue-filter.xml new file mode 100644 index 0000000000..47dbadc687 --- /dev/null +++ b/tests/integration-tests/src/test/resources/reload-queue-filter.xml @@ -0,0 +1,42 @@ + + + + + + + false + + + tcp://0.0.0.0:61616 + + + +
+ + + + + +
+
+
+
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java index 71ced7f35a..5297ab6209 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java @@ -426,6 +426,11 @@ public class FakeQueue extends CriticalComponentImpl implements Queue { return null; } + @Override + public void setFilter(Filter filter) { + + } + @Override public long getMessageCount() { return messageCount; diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java index 44e5823b2b..0bbe8efeec 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java @@ -24,6 +24,7 @@ import java.util.Set; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager; import org.apache.activemq.artemis.core.postoffice.Binding; import org.apache.activemq.artemis.core.postoffice.Bindings; @@ -46,6 +47,7 @@ public class FakePostOffice implements PostOffice { @Override public QueueBinding updateQueue(SimpleString name, RoutingType routingType, + Filter filter, Integer maxConsumers, Boolean purgeOnNoConsumers, Boolean exclusive,