ARTEMIS-2797 - Reset queue properties by unsetting them in broker.xml

Now it is possible to reset queue parameters to their defaults by removing them
from broker.xml and redeploying the configuration.

Originally this PR covered the "filter" parameter only.
This commit is contained in:
Jan Šmucr 2020-06-09 07:03:53 +02:00
parent 18acd5f8d4
commit 5070e7a72c
9 changed files with 469 additions and 41 deletions

View File

@ -16,22 +16,6 @@
*/ */
package org.apache.activemq.artemis.core.postoffice.impl; package org.apache.activemq.artemis.core.postoffice.impl;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.artemis.api.core.ActiveMQAddressDoesNotExistException; import org.apache.activemq.artemis.api.core.ActiveMQAddressDoesNotExistException;
import org.apache.activemq.artemis.api.core.ActiveMQAddressFullException; import org.apache.activemq.artemis.api.core.ActiveMQAddressFullException;
import org.apache.activemq.artemis.api.core.ActiveMQDuplicateIdException; import org.apache.activemq.artemis.api.core.ActiveMQDuplicateIdException;
@ -79,6 +63,7 @@ import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.server.group.GroupingHandler; import org.apache.activemq.artemis.core.server.group.GroupingHandler;
import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.QueueConfigurationUtils;
import org.apache.activemq.artemis.core.server.impl.QueueManagerImpl; import org.apache.activemq.artemis.core.server.impl.QueueManagerImpl;
import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl; import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
import org.apache.activemq.artemis.core.server.management.ManagementService; import org.apache.activemq.artemis.core.server.management.ManagementService;
@ -97,6 +82,23 @@ import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.apache.activemq.artemis.utils.collections.TypedProperties; import org.apache.activemq.artemis.utils.collections.TypedProperties;
import org.jboss.logging.Logger; import org.jboss.logging.Logger;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
/** /**
* This is the class that will make the routing to Queues and decide which consumer will get the messages * This is the class that will make the routing to Queues and decide which consumer will get the messages
* It's the queue component on distributing the messages * * * It's the queue component on distributing the messages * *
@ -632,7 +634,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
boolean changed = false; boolean changed = false;
//validate update //validate update
if (queueConfiguration.getMaxConsumers() != null && queueConfiguration.getMaxConsumers().intValue() != Queue.MAX_CONSUMERS_UNLIMITED) { if (queueConfiguration.getMaxConsumers() != null && queueConfiguration.getMaxConsumers() != Queue.MAX_CONSUMERS_UNLIMITED) {
final int consumerCount = queue.getConsumerCount(); final int consumerCount = queue.getConsumerCount();
if (consumerCount > queueConfiguration.getMaxConsumers()) { if (consumerCount > queueConfiguration.getMaxConsumers()) {
throw ActiveMQMessageBundle.BUNDLE.invalidMaxConsumersUpdate(queueConfiguration.getName().toString(), queueConfiguration.getMaxConsumers(), consumerCount); throw ActiveMQMessageBundle.BUNDLE.invalidMaxConsumersUpdate(queueConfiguration.getName().toString(), queueConfiguration.getMaxConsumers(), consumerCount);
@ -647,74 +649,94 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
} }
} }
//atomic update QueueConfigurationUtils.applyDynamicQueueDefaults(queueConfiguration, addressSettingsRepository.getMatch(queueConfiguration.getAddress().toString()));
if (queueConfiguration.getMaxConsumers() != null && queue.getMaxConsumers() != queueConfiguration.getMaxConsumers().intValue()) {
// atomic update, reset to defaults if value == null
// maxConsumers
if (queue.getMaxConsumers() != queueConfiguration.getMaxConsumers()) {
changed = true; changed = true;
queue.setMaxConsumer(queueConfiguration.getMaxConsumers()); queue.setMaxConsumer(queueConfiguration.getMaxConsumers());
} }
if (queueConfiguration.getRoutingType() != null && queue.getRoutingType() != queueConfiguration.getRoutingType()) { // routingType
if (queue.getRoutingType() != queueConfiguration.getRoutingType()) {
changed = true; changed = true;
queue.setRoutingType(queueConfiguration.getRoutingType()); queue.setRoutingType(queueConfiguration.getRoutingType());
} }
if (queueConfiguration.isPurgeOnNoConsumers() != null && queue.isPurgeOnNoConsumers() != queueConfiguration.isPurgeOnNoConsumers().booleanValue()) { // purgeOnNoConsumers
if (queue.isPurgeOnNoConsumers() != queueConfiguration.isPurgeOnNoConsumers()) {
changed = true; changed = true;
queue.setPurgeOnNoConsumers(queueConfiguration.isPurgeOnNoConsumers()); queue.setPurgeOnNoConsumers(queueConfiguration.isPurgeOnNoConsumers());
} }
if (queueConfiguration.isEnabled() != null && queue.isEnabled() != queueConfiguration.isEnabled().booleanValue()) { // enabled
if (queue.isEnabled() != queueConfiguration.isEnabled()) {
changed = true; changed = true;
queue.setEnabled(queueConfiguration.isEnabled()); queue.setEnabled(queueConfiguration.isEnabled());
} }
if (queueConfiguration.isExclusive() != null && queue.isExclusive() != queueConfiguration.isExclusive().booleanValue()) { // exclusive
if (queue.isExclusive() != queueConfiguration.isExclusive()) {
changed = true; changed = true;
queue.setExclusive(queueConfiguration.isExclusive()); queue.setExclusive(queueConfiguration.isExclusive());
} }
if (queueConfiguration.isGroupRebalance() != null && queue.isGroupRebalance() != queueConfiguration.isGroupRebalance().booleanValue()) { // groupRebalance
if (queue.isGroupRebalance() != queueConfiguration.isGroupRebalance()) {
changed = true; changed = true;
queue.setGroupRebalance(queueConfiguration.isGroupRebalance()); queue.setGroupRebalance(queueConfiguration.isGroupRebalance());
} }
if (queueConfiguration.getGroupBuckets() != null && queue.getGroupBuckets() != queueConfiguration.getGroupBuckets().intValue()) { // groupBuckets
if (queue.getGroupBuckets() != queueConfiguration.getGroupBuckets()) {
changed = true; changed = true;
queue.setGroupBuckets(queueConfiguration.getGroupBuckets()); queue.setGroupBuckets(queueConfiguration.getGroupBuckets());
} }
if (queueConfiguration.getGroupFirstKey() != null && !queueConfiguration.getGroupFirstKey().equals(queue.getGroupFirstKey())) { // groupFirstKey
// Objects.equals() performs the null check for us
if (!Objects.equals(queue.getGroupFirstKey(), queueConfiguration.getGroupFirstKey())) {
changed = true; changed = true;
queue.setGroupFirstKey(queueConfiguration.getGroupFirstKey()); queue.setGroupFirstKey(queueConfiguration.getGroupFirstKey());
} }
if (queueConfiguration.isNonDestructive() != null && queue.isNonDestructive() != queueConfiguration.isNonDestructive().booleanValue()) { // nonDestructive
if (queue.isNonDestructive() != queueConfiguration.isNonDestructive()) {
changed = true; changed = true;
queue.setNonDestructive(queueConfiguration.isNonDestructive()); queue.setNonDestructive(queueConfiguration.isNonDestructive());
} }
if (queueConfiguration.getConsumersBeforeDispatch() != null && !queueConfiguration.getConsumersBeforeDispatch().equals(queue.getConsumersBeforeDispatch())) { // consumersBeforeDispatch
if (queue.getConsumersBeforeDispatch() != queueConfiguration.getConsumersBeforeDispatch()) {
changed = true; changed = true;
queue.setConsumersBeforeDispatch(queueConfiguration.getConsumersBeforeDispatch().intValue()); queue.setConsumersBeforeDispatch(queueConfiguration.getConsumersBeforeDispatch());
} }
if (queueConfiguration.getDelayBeforeDispatch() != null && !queueConfiguration.getDelayBeforeDispatch().equals(queue.getDelayBeforeDispatch())) { // delayBeforeDispatch
if (queue.getDelayBeforeDispatch() != queueConfiguration.getDelayBeforeDispatch()) {
changed = true; changed = true;
queue.setDelayBeforeDispatch(queueConfiguration.getDelayBeforeDispatch().longValue()); queue.setDelayBeforeDispatch(queueConfiguration.getDelayBeforeDispatch());
} }
Filter filter = FilterImpl.createFilter(queueConfiguration.getFilterString()); // filter
if (filter != null && !filter.equals(queue.getFilter())) { // There's no default ActiveMQDefaultConfiguration setting for a filter
final Filter newFilter = FilterImpl.createFilter(queueConfiguration.getFilterString());
if (!Objects.equals(queue.getFilter(), newFilter)) {
changed = true; changed = true;
queue.setFilter(filter); queue.setFilter(newFilter);
} }
if (queueConfiguration.isConfigurationManaged() != null && !queueConfiguration.isConfigurationManaged().equals(queue.isConfigurationManaged())) { // configurationManaged
changed = true; if (queueConfiguration.isConfigurationManaged() != queue.isConfigurationManaged()) {
queue.setConfigurationManaged(queueConfiguration.isConfigurationManaged()); queue.setConfigurationManaged(queueConfiguration.isConfigurationManaged());
changed = true;
} }
/* Why is this?
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
if (queueConfiguration.getUser() == null && queue.getUser() != null) { if (queueConfiguration.getUser() == null && queue.getUser() != null) {
logger.debug("Ignoring updating Queue to a NULL user"); logger.debug("Ignoring updating Queue to a NULL user");
} }
} }
if (queueConfiguration.getUser() != null && !queueConfiguration.getUser().equals(queue.getUser())) { */
// user
if (!Objects.equals(queue.getUser(), queueConfiguration.getUser())) {
changed = true; changed = true;
queue.setUser(queueConfiguration.getUser()); queue.setUser(queueConfiguration.getUser());
} }
if (queueConfiguration.getRingSize() != null && !queueConfiguration.getRingSize().equals(queue.getRingSize())) { // ringSize
if (queue.getRingSize() != queueConfiguration.getRingSize()) {
changed = true; changed = true;
queue.setRingSize(queueConfiguration.getRingSize()); queue.setRingSize(queueConfiguration.getRingSize());
} }
if (changed) { if (changed) {
final long txID = storageManager.generateID(); final long txID = storageManager.generateID();
try { try {

View File

@ -244,8 +244,8 @@ attribute `name` | N/A | X | A queue with new name will be deployed and the queu
attribute `max-consumers` | If max-consumers > current consumers max-consumers will update on reload | max-consumers will be set back to the default `-1` | If max-consumers > current consumers max-consumers will update on reload attribute `max-consumers` | If max-consumers > current consumers max-consumers will update on reload | max-consumers will be set back to the default `-1` | If max-consumers > current consumers max-consumers will update on reload
attribute `purge-on-no-consumers` | On reload purge-on-no-consumers will be updated | Will be set back to the default `false` | On reload purge-on-no-consumers will be updated attribute `purge-on-no-consumers` | On reload purge-on-no-consumers will be updated | Will be set back to the default `false` | On reload purge-on-no-consumers will be updated
attribute `address` | N/A | No effect unless starting broker | No effect unless starting broker attribute `address` | N/A | No effect unless starting broker | No effect unless starting broker
attribute `filter` | N/A | No effect unless starting broker | No effect unless starting broker attribute `filter` | The filter will be added after reloading | The filter will be removed after reloading | The filter will be updated after reloading
attribute `durable` | N/A | No effect unless starting broker | No effect unless starting broker attribute `durable` | The queue durability will be set to the given value after reloading | The queue durability will be set to the default `true` after reloading | The queue durability will be set to the new value after reloading
### `<jms>` *(Deprecated)* ### `<jms>` *(Deprecated)*

View File

@ -8,6 +8,17 @@ This chapter provides the following information for each release:
- **Note:** Follow the general upgrade procedure outlined in the [Upgrading the Broker](upgrading.md) - **Note:** Follow the general upgrade procedure outlined in the [Upgrading the Broker](upgrading.md)
chapter in addition to any version-specific upgrade instructions outlined here. chapter in addition to any version-specific upgrade instructions outlined here.
## 2.14.0
[Full release notes](https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315920&version=12348290).
Highlights:
- Removing `broker.xml` queue parameters now causes these to be reset to their default values.
#### Upgrading from older versions
Make sure the existing queues have their parameters set according to the `broker.xml` values before upgrading.
## 2.11.0 ## 2.11.0
[Full release notes](https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315920&version=12346258). [Full release notes](https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315920&version=12346258).

View File

@ -37,10 +37,13 @@ import javax.jms.MessageProducer;
import javax.jms.Queue; import javax.jms.Queue;
import javax.jms.Session; import javax.jms.Session;
import javax.jms.TextMessage; import javax.jms.TextMessage;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.postoffice.Binding; import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.QueueBinding; import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
import org.apache.activemq.artemis.core.security.Role; import org.apache.activemq.artemis.core.security.Role;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.server.cluster.impl.RemoteQueueBindingImpl; import org.apache.activemq.artemis.core.server.cluster.impl.RemoteQueueBindingImpl;
@ -287,6 +290,164 @@ public class RedeployTest extends ActiveMQTestBase {
} }
} }
private void deployBrokerConfig(EmbeddedActiveMQ server, URL configFile) throws Exception {
Path brokerXML = getTestDirfile().toPath().resolve("broker.xml");
Files.copy(configFile.openStream(), brokerXML, StandardCopyOption.REPLACE_EXISTING);
final ReusableLatch latch = new ReusableLatch(1);
Runnable tick = latch::countDown;
server.getActiveMQServer().getReloadManager().setTick(tick);
latch.await(10, TimeUnit.SECONDS);
}
private void doTestRemoveFilter(URL testConfiguration) throws Exception {
Path brokerXML = getTestDirfile().toPath().resolve("broker.xml");
URL baseConfig = RedeployTest.class.getClassLoader().getResource("reload-queue-filter.xml");
Files.copy(baseConfig.openStream(), brokerXML, StandardCopyOption.REPLACE_EXISTING);
EmbeddedActiveMQ embeddedActiveMQ = new EmbeddedActiveMQ();
embeddedActiveMQ.setConfigResourcePath(brokerXML.toUri().toString());
embeddedActiveMQ.start();
deployBrokerConfig(embeddedActiveMQ, baseConfig);
try {
try (ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
Connection connection = factory.createConnection();
Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE)) {
connection.start();
Queue queue = session.createQueue("myFilterQueue");
// Test that the original filter has been set up
LocalQueueBinding queueBinding = (LocalQueueBinding) embeddedActiveMQ.getActiveMQServer().getPostOffice()
.getBinding(new SimpleString("myFilterQueue"));
// The "x = 'x'" value is found in "reload-queue-filter.xml"
assertEquals("x = 'x'", queueBinding.getFilter().getFilterString().toString());
MessageProducer producer = session.createProducer(queue);
// Test that the original filter affects the flow
Message passingMessage = session.createMessage();
passingMessage.setStringProperty("x", "x");
producer.send(passingMessage);
Message filteredMessage = session.createMessage();
filteredMessage.setStringProperty("x", "y");
producer.send(filteredMessage);
MessageConsumer consumer = session.createConsumer(queue);
Message receivedMessage = consumer.receive(2000);
assertNotNull(receivedMessage);
assertEquals("x", receivedMessage.getStringProperty("x"));
assertNull(consumer.receive(2000));
consumer.close();
}
deployBrokerConfig(embeddedActiveMQ, testConfiguration);
try (ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
Connection connection = factory.createConnection();
Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE)) {
connection.start();
Queue queue = session.createQueue("myFilterQueue");
// Test that the filter has been removed
LocalQueueBinding queueBinding = (LocalQueueBinding) embeddedActiveMQ.getActiveMQServer().getPostOffice()
.getBinding(new SimpleString("myFilterQueue"));
assertNull(queueBinding.getFilter());
MessageProducer producer = session.createProducer(queue);
// Test that the original filter no longer affects the flow
Message message1 = session.createMessage();
message1.setStringProperty("x", "x");
producer.send(message1);
Message message2 = session.createMessage();
message2.setStringProperty("x", "y");
producer.send(message2);
MessageConsumer consumer = session.createConsumer(queue);
assertNotNull(consumer.receive(2000));
assertNotNull(consumer.receive(2000));
consumer.close();
}
} finally {
embeddedActiveMQ.stop();
}
}
@Test
public void testRedeployRemoveFilter() throws Exception {
doTestRemoveFilter(RedeployTest.class.getClassLoader().getResource("reload-queue-filter-updated-empty.xml"));
doTestRemoveFilter(RedeployTest.class.getClassLoader().getResource("reload-queue-filter-removed.xml"));
}
@Test
public void testRedeployQueueDefaults() throws Exception {
Path brokerXML = getTestDirfile().toPath().resolve("broker.xml");
URL baseConfig = RedeployTest.class.getClassLoader().getResource("reload-queue-defaults-before.xml");
URL newConfig = RedeployTest.class.getClassLoader().getResource("reload-queue-defaults-after.xml");
Files.copy(baseConfig.openStream(), brokerXML, StandardCopyOption.REPLACE_EXISTING);
EmbeddedActiveMQ embeddedActiveMQ = new EmbeddedActiveMQ();
embeddedActiveMQ.setConfigResourcePath(brokerXML.toUri().toString());
embeddedActiveMQ.start();
try {
LocalQueueBinding queueBinding = (LocalQueueBinding) embeddedActiveMQ.getActiveMQServer().getPostOffice()
.getBinding(new SimpleString("myQueue"));
org.apache.activemq.artemis.core.server.Queue queue = queueBinding.getQueue();
assertNotEquals(queue.getMaxConsumers(), ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers());
assertNotEquals(queue.getRoutingType(), RoutingType.MULTICAST);
assertNotEquals(queue.isPurgeOnNoConsumers(), ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers());
assertNotEquals(queue.isEnabled(), ActiveMQDefaultConfiguration.getDefaultEnabled());
assertNotEquals(queue.isExclusive(), ActiveMQDefaultConfiguration.getDefaultExclusive());
assertNotEquals(queue.isGroupRebalance(), ActiveMQDefaultConfiguration.getDefaultGroupRebalance());
assertNotEquals(queue.getGroupBuckets(), ActiveMQDefaultConfiguration.getDefaultGroupBuckets());
assertNotEquals(queue.getGroupFirstKey(), ActiveMQDefaultConfiguration.getDefaultGroupFirstKey());
assertNotEquals(queue.isNonDestructive(), ActiveMQDefaultConfiguration.getDefaultNonDestructive());
assertNotEquals(queue.getConsumersBeforeDispatch(), ActiveMQDefaultConfiguration.getDefaultConsumersBeforeDispatch());
assertNotEquals(queue.getDelayBeforeDispatch(), ActiveMQDefaultConfiguration.getDefaultDelayBeforeDispatch());
assertNotEquals(queue.getFilter(), null);
assertNotEquals(queue.getUser(), "jdoe");
assertNotEquals(queue.getRingSize(), ActiveMQDefaultConfiguration.getDefaultRingSize());
deployBrokerConfig(embeddedActiveMQ, newConfig);
assertEquals(queue.getMaxConsumers(), ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers());
assertEquals(queue.getRoutingType(), RoutingType.MULTICAST);
assertEquals(queue.isPurgeOnNoConsumers(), ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers());
assertEquals(queue.isEnabled(), ActiveMQDefaultConfiguration.getDefaultEnabled());
assertEquals(queue.isExclusive(), ActiveMQDefaultConfiguration.getDefaultExclusive());
assertEquals(queue.isGroupRebalance(), ActiveMQDefaultConfiguration.getDefaultGroupRebalance());
assertEquals(queue.getGroupBuckets(), ActiveMQDefaultConfiguration.getDefaultGroupBuckets());
assertEquals(queue.getGroupFirstKey(), ActiveMQDefaultConfiguration.getDefaultGroupFirstKey());
assertEquals(queue.isNonDestructive(), ActiveMQDefaultConfiguration.getDefaultNonDestructive());
assertEquals(queue.getConsumersBeforeDispatch(), ActiveMQDefaultConfiguration.getDefaultConsumersBeforeDispatch());
assertEquals(queue.getDelayBeforeDispatch(), ActiveMQDefaultConfiguration.getDefaultDelayBeforeDispatch());
assertEquals(queue.getFilter(), null);
assertEquals(queue.getUser(), null);
assertEquals(queue.getRingSize(), ActiveMQDefaultConfiguration.getDefaultRingSize());
} finally {
embeddedActiveMQ.stop();
}
}
@Test @Test
public void testRedeployWithFailover() throws Exception { public void testRedeployWithFailover() throws Exception {
Set<Role> original = new HashSet<>(); Set<Role> original = new HashSet<>();

View File

@ -471,6 +471,53 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
checkNoResource(ObjectNameBuilder.DEFAULT.getAddressObjectName(address)); checkNoResource(ObjectNameBuilder.DEFAULT.getAddressObjectName(address));
} }
@Test
public void testRemoveQueueFilter() throws Exception {
String address = RandomUtil.randomString();
QueueConfiguration queue1 = new QueueConfiguration("q1")
.setAddress(address)
.setFilterString("hello='world'");
QueueConfiguration queue2 = new QueueConfiguration("q2")
.setAddress(address)
.setFilterString("hello='darling'");
ActiveMQServerControl serverControl = createManagementControl();
serverControl.createAddress(address, "MULTICAST");
if (legacyCreateQueue) {
serverControl.createQueue(address, queue1.getName().toString(), queue1.getFilterString().toString(), queue1.isDurable());
serverControl.createQueue(address, queue2.getName().toString(), queue2.getFilterString().toString(), queue2.isDurable());
} else {
serverControl.createQueue(queue1.toJSON());
serverControl.createQueue(queue2.toJSON());
}
ServerLocator loc = createInVMNonHALocator();
ClientSessionFactory csf = createSessionFactory(loc);
ClientSession session = csf.createSession();
session.start();
ClientProducer producer = session.createProducer(address);
ClientConsumer consumer1 = session.createConsumer("q1");
ClientConsumer consumer2 = session.createConsumer("q2");
ClientMessage m = session.createMessage(true);
m.putStringProperty("hello", "world");
producer.send(m);
assertNotNull(consumer1.receiveImmediate());
assertNull(consumer2.receiveImmediate());
serverControl.updateQueue(queue2.setFilterString((String) null).toJSON());
producer.send(m);
assertNotNull(consumer1.receiveImmediate());
assertNotNull(consumer2.receiveImmediate());
}
@Test @Test
public void testCreateAndDestroyQueueClosingConsumers() throws Exception { public void testCreateAndDestroyQueueClosingConsumers() throws Exception {
SimpleString address = RandomUtil.randomSimpleString(); SimpleString address = RandomUtil.randomSimpleString();

View File

@ -0,0 +1,43 @@
<?xml version='1.0'?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<configuration xmlns="urn:activemq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
<core xmlns="urn:activemq:core">
<security-enabled>false</security-enabled>
<persistence-enabled>false</persistence-enabled>
<acceptors>
<acceptor name="artemis">tcp://0.0.0.0:61616</acceptor>
</acceptors>
<addresses>
<address name="myQueue">
<multicast>
<!-- Shoud reset queue according to org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration -->
<!-- some values cannot be changed: last-value -->
<queue name="myQueue" />
</multicast>
</address>
</addresses>
</core>
</configuration>

View File

@ -0,0 +1,59 @@
<?xml version='1.0'?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<configuration xmlns="urn:activemq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
<core xmlns="urn:activemq:core">
<security-enabled>false</security-enabled>
<persistence-enabled>false</persistence-enabled>
<acceptors>
<acceptor name="artemis">tcp://0.0.0.0:61616</acceptor>
</acceptors>
<addresses>
<address name="myQueue">
<anycast>
<!-- non-default values based on org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration -->
<!-- some values cannot be changed: last-value -->
<queue name="myQueue"
max-consumers="10"
purge-on-no-consumers="true"
exclusive="true"
group-rebalance="true"
group-buckets="10"
group-first-key="foo"
non-destructive="true"
consumers-before-dispatch="10"
delay-before-dispatch="10"
ring-size="10"
enabled="false"
>
<durable>false</durable>
<filter string="x = 'x'"/>
<user>jdoe</user>
</queue>
</anycast>
</address>
</addresses>
</core>
</configuration>

View File

@ -0,0 +1,42 @@
<?xml version='1.0'?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<configuration xmlns="urn:activemq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
<core xmlns="urn:activemq:core">
<security-enabled>false</security-enabled>
<persistence-enabled>false</persistence-enabled>
<acceptors>
<acceptor name="artemis">tcp://0.0.0.0:61616</acceptor>
</acceptors>
<addresses>
<address name="myFilterQueue">
<anycast>
<queue name="myFilterQueue">
</queue>
</anycast>
</address>
</addresses>
</core>
</configuration>

View File

@ -0,0 +1,43 @@
<?xml version='1.0'?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<configuration xmlns="urn:activemq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
<core xmlns="urn:activemq:core">
<security-enabled>false</security-enabled>
<persistence-enabled>false</persistence-enabled>
<acceptors>
<acceptor name="artemis">tcp://0.0.0.0:61616</acceptor>
</acceptors>
<addresses>
<address name="myFilterQueue">
<anycast>
<queue name="myFilterQueue">
<filter string=""/>
</queue>
</anycast>
</address>
</addresses>
</core>
</configuration>