This closes #3172
This commit is contained in:
commit
b9eca30008
|
@ -16,22 +16,6 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.artemis.core.postoffice.impl;
|
package org.apache.activemq.artemis.core.postoffice.impl;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.EnumSet;
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.HashSet;
|
|
||||||
import java.util.Iterator;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.Set;
|
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
|
||||||
import java.util.concurrent.ConcurrentMap;
|
|
||||||
import java.util.concurrent.Executor;
|
|
||||||
import java.util.concurrent.ScheduledExecutorService;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
|
||||||
|
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQAddressDoesNotExistException;
|
import org.apache.activemq.artemis.api.core.ActiveMQAddressDoesNotExistException;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQAddressFullException;
|
import org.apache.activemq.artemis.api.core.ActiveMQAddressFullException;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQDuplicateIdException;
|
import org.apache.activemq.artemis.api.core.ActiveMQDuplicateIdException;
|
||||||
|
@ -79,6 +63,7 @@ import org.apache.activemq.artemis.core.server.RoutingContext;
|
||||||
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
|
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
|
||||||
import org.apache.activemq.artemis.core.server.group.GroupingHandler;
|
import org.apache.activemq.artemis.core.server.group.GroupingHandler;
|
||||||
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
|
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
|
||||||
|
import org.apache.activemq.artemis.core.server.impl.QueueConfigurationUtils;
|
||||||
import org.apache.activemq.artemis.core.server.impl.QueueManagerImpl;
|
import org.apache.activemq.artemis.core.server.impl.QueueManagerImpl;
|
||||||
import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
|
import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
|
||||||
import org.apache.activemq.artemis.core.server.management.ManagementService;
|
import org.apache.activemq.artemis.core.server.management.ManagementService;
|
||||||
|
@ -97,6 +82,23 @@ import org.apache.activemq.artemis.utils.UUIDGenerator;
|
||||||
import org.apache.activemq.artemis.utils.collections.TypedProperties;
|
import org.apache.activemq.artemis.utils.collections.TypedProperties;
|
||||||
import org.jboss.logging.Logger;
|
import org.jboss.logging.Logger;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.EnumSet;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Objects;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.ConcurrentMap;
|
||||||
|
import java.util.concurrent.Executor;
|
||||||
|
import java.util.concurrent.ScheduledExecutorService;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This is the class that will make the routing to Queues and decide which consumer will get the messages
|
* This is the class that will make the routing to Queues and decide which consumer will get the messages
|
||||||
* It's the queue component on distributing the messages * *
|
* It's the queue component on distributing the messages * *
|
||||||
|
@ -632,7 +634,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
|
||||||
boolean changed = false;
|
boolean changed = false;
|
||||||
|
|
||||||
//validate update
|
//validate update
|
||||||
if (queueConfiguration.getMaxConsumers() != null && queueConfiguration.getMaxConsumers().intValue() != Queue.MAX_CONSUMERS_UNLIMITED) {
|
if (queueConfiguration.getMaxConsumers() != null && queueConfiguration.getMaxConsumers() != Queue.MAX_CONSUMERS_UNLIMITED) {
|
||||||
final int consumerCount = queue.getConsumerCount();
|
final int consumerCount = queue.getConsumerCount();
|
||||||
if (consumerCount > queueConfiguration.getMaxConsumers()) {
|
if (consumerCount > queueConfiguration.getMaxConsumers()) {
|
||||||
throw ActiveMQMessageBundle.BUNDLE.invalidMaxConsumersUpdate(queueConfiguration.getName().toString(), queueConfiguration.getMaxConsumers(), consumerCount);
|
throw ActiveMQMessageBundle.BUNDLE.invalidMaxConsumersUpdate(queueConfiguration.getName().toString(), queueConfiguration.getMaxConsumers(), consumerCount);
|
||||||
|
@ -647,74 +649,94 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
//atomic update
|
QueueConfigurationUtils.applyDynamicQueueDefaults(queueConfiguration, addressSettingsRepository.getMatch(queueConfiguration.getAddress().toString()));
|
||||||
if (queueConfiguration.getMaxConsumers() != null && queue.getMaxConsumers() != queueConfiguration.getMaxConsumers().intValue()) {
|
|
||||||
|
// atomic update, reset to defaults if value == null
|
||||||
|
// maxConsumers
|
||||||
|
if (queue.getMaxConsumers() != queueConfiguration.getMaxConsumers()) {
|
||||||
changed = true;
|
changed = true;
|
||||||
queue.setMaxConsumer(queueConfiguration.getMaxConsumers());
|
queue.setMaxConsumer(queueConfiguration.getMaxConsumers());
|
||||||
}
|
}
|
||||||
if (queueConfiguration.getRoutingType() != null && queue.getRoutingType() != queueConfiguration.getRoutingType()) {
|
// routingType
|
||||||
|
if (queue.getRoutingType() != queueConfiguration.getRoutingType()) {
|
||||||
changed = true;
|
changed = true;
|
||||||
queue.setRoutingType(queueConfiguration.getRoutingType());
|
queue.setRoutingType(queueConfiguration.getRoutingType());
|
||||||
}
|
}
|
||||||
if (queueConfiguration.isPurgeOnNoConsumers() != null && queue.isPurgeOnNoConsumers() != queueConfiguration.isPurgeOnNoConsumers().booleanValue()) {
|
// purgeOnNoConsumers
|
||||||
|
if (queue.isPurgeOnNoConsumers() != queueConfiguration.isPurgeOnNoConsumers()) {
|
||||||
changed = true;
|
changed = true;
|
||||||
queue.setPurgeOnNoConsumers(queueConfiguration.isPurgeOnNoConsumers());
|
queue.setPurgeOnNoConsumers(queueConfiguration.isPurgeOnNoConsumers());
|
||||||
}
|
}
|
||||||
if (queueConfiguration.isEnabled() != null && queue.isEnabled() != queueConfiguration.isEnabled().booleanValue()) {
|
// enabled
|
||||||
|
if (queue.isEnabled() != queueConfiguration.isEnabled()) {
|
||||||
changed = true;
|
changed = true;
|
||||||
queue.setEnabled(queueConfiguration.isEnabled());
|
queue.setEnabled(queueConfiguration.isEnabled());
|
||||||
}
|
}
|
||||||
if (queueConfiguration.isExclusive() != null && queue.isExclusive() != queueConfiguration.isExclusive().booleanValue()) {
|
// exclusive
|
||||||
|
if (queue.isExclusive() != queueConfiguration.isExclusive()) {
|
||||||
changed = true;
|
changed = true;
|
||||||
queue.setExclusive(queueConfiguration.isExclusive());
|
queue.setExclusive(queueConfiguration.isExclusive());
|
||||||
}
|
}
|
||||||
if (queueConfiguration.isGroupRebalance() != null && queue.isGroupRebalance() != queueConfiguration.isGroupRebalance().booleanValue()) {
|
// groupRebalance
|
||||||
|
if (queue.isGroupRebalance() != queueConfiguration.isGroupRebalance()) {
|
||||||
changed = true;
|
changed = true;
|
||||||
queue.setGroupRebalance(queueConfiguration.isGroupRebalance());
|
queue.setGroupRebalance(queueConfiguration.isGroupRebalance());
|
||||||
}
|
}
|
||||||
if (queueConfiguration.getGroupBuckets() != null && queue.getGroupBuckets() != queueConfiguration.getGroupBuckets().intValue()) {
|
// groupBuckets
|
||||||
|
if (queue.getGroupBuckets() != queueConfiguration.getGroupBuckets()) {
|
||||||
changed = true;
|
changed = true;
|
||||||
queue.setGroupBuckets(queueConfiguration.getGroupBuckets());
|
queue.setGroupBuckets(queueConfiguration.getGroupBuckets());
|
||||||
}
|
}
|
||||||
if (queueConfiguration.getGroupFirstKey() != null && !queueConfiguration.getGroupFirstKey().equals(queue.getGroupFirstKey())) {
|
// groupFirstKey
|
||||||
|
// Objects.equals() performs the null check for us
|
||||||
|
if (!Objects.equals(queue.getGroupFirstKey(), queueConfiguration.getGroupFirstKey())) {
|
||||||
changed = true;
|
changed = true;
|
||||||
queue.setGroupFirstKey(queueConfiguration.getGroupFirstKey());
|
queue.setGroupFirstKey(queueConfiguration.getGroupFirstKey());
|
||||||
}
|
}
|
||||||
if (queueConfiguration.isNonDestructive() != null && queue.isNonDestructive() != queueConfiguration.isNonDestructive().booleanValue()) {
|
// nonDestructive
|
||||||
|
if (queue.isNonDestructive() != queueConfiguration.isNonDestructive()) {
|
||||||
changed = true;
|
changed = true;
|
||||||
queue.setNonDestructive(queueConfiguration.isNonDestructive());
|
queue.setNonDestructive(queueConfiguration.isNonDestructive());
|
||||||
}
|
}
|
||||||
if (queueConfiguration.getConsumersBeforeDispatch() != null && !queueConfiguration.getConsumersBeforeDispatch().equals(queue.getConsumersBeforeDispatch())) {
|
// consumersBeforeDispatch
|
||||||
|
if (queue.getConsumersBeforeDispatch() != queueConfiguration.getConsumersBeforeDispatch()) {
|
||||||
changed = true;
|
changed = true;
|
||||||
queue.setConsumersBeforeDispatch(queueConfiguration.getConsumersBeforeDispatch().intValue());
|
queue.setConsumersBeforeDispatch(queueConfiguration.getConsumersBeforeDispatch());
|
||||||
}
|
}
|
||||||
if (queueConfiguration.getDelayBeforeDispatch() != null && !queueConfiguration.getDelayBeforeDispatch().equals(queue.getDelayBeforeDispatch())) {
|
// delayBeforeDispatch
|
||||||
|
if (queue.getDelayBeforeDispatch() != queueConfiguration.getDelayBeforeDispatch()) {
|
||||||
changed = true;
|
changed = true;
|
||||||
queue.setDelayBeforeDispatch(queueConfiguration.getDelayBeforeDispatch().longValue());
|
queue.setDelayBeforeDispatch(queueConfiguration.getDelayBeforeDispatch());
|
||||||
}
|
}
|
||||||
Filter filter = FilterImpl.createFilter(queueConfiguration.getFilterString());
|
// filter
|
||||||
if (filter != null && !filter.equals(queue.getFilter())) {
|
// There's no default ActiveMQDefaultConfiguration setting for a filter
|
||||||
|
final Filter newFilter = FilterImpl.createFilter(queueConfiguration.getFilterString());
|
||||||
|
if (!Objects.equals(queue.getFilter(), newFilter)) {
|
||||||
changed = true;
|
changed = true;
|
||||||
queue.setFilter(filter);
|
queue.setFilter(newFilter);
|
||||||
}
|
}
|
||||||
if (queueConfiguration.isConfigurationManaged() != null && !queueConfiguration.isConfigurationManaged().equals(queue.isConfigurationManaged())) {
|
// configurationManaged
|
||||||
changed = true;
|
if (queueConfiguration.isConfigurationManaged() != queue.isConfigurationManaged()) {
|
||||||
queue.setConfigurationManaged(queueConfiguration.isConfigurationManaged());
|
queue.setConfigurationManaged(queueConfiguration.isConfigurationManaged());
|
||||||
|
changed = true;
|
||||||
}
|
}
|
||||||
|
/* Why is this?
|
||||||
if (logger.isDebugEnabled()) {
|
if (logger.isDebugEnabled()) {
|
||||||
if (queueConfiguration.getUser() == null && queue.getUser() != null) {
|
if (queueConfiguration.getUser() == null && queue.getUser() != null) {
|
||||||
logger.debug("Ignoring updating Queue to a NULL user");
|
logger.debug("Ignoring updating Queue to a NULL user");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (queueConfiguration.getUser() != null && !queueConfiguration.getUser().equals(queue.getUser())) {
|
*/
|
||||||
|
// user
|
||||||
|
if (!Objects.equals(queue.getUser(), queueConfiguration.getUser())) {
|
||||||
changed = true;
|
changed = true;
|
||||||
queue.setUser(queueConfiguration.getUser());
|
queue.setUser(queueConfiguration.getUser());
|
||||||
}
|
}
|
||||||
if (queueConfiguration.getRingSize() != null && !queueConfiguration.getRingSize().equals(queue.getRingSize())) {
|
// ringSize
|
||||||
|
if (queue.getRingSize() != queueConfiguration.getRingSize()) {
|
||||||
changed = true;
|
changed = true;
|
||||||
queue.setRingSize(queueConfiguration.getRingSize());
|
queue.setRingSize(queueConfiguration.getRingSize());
|
||||||
}
|
}
|
||||||
|
|
||||||
if (changed) {
|
if (changed) {
|
||||||
final long txID = storageManager.generateID();
|
final long txID = storageManager.generateID();
|
||||||
try {
|
try {
|
||||||
|
|
|
@ -244,8 +244,8 @@ attribute `name` | N/A | X | A queue with new name will be deployed and the queu
|
||||||
attribute `max-consumers` | If max-consumers > current consumers max-consumers will update on reload | max-consumers will be set back to the default `-1` | If max-consumers > current consumers max-consumers will update on reload
|
attribute `max-consumers` | If max-consumers > current consumers max-consumers will update on reload | max-consumers will be set back to the default `-1` | If max-consumers > current consumers max-consumers will update on reload
|
||||||
attribute `purge-on-no-consumers` | On reload purge-on-no-consumers will be updated | Will be set back to the default `false` | On reload purge-on-no-consumers will be updated
|
attribute `purge-on-no-consumers` | On reload purge-on-no-consumers will be updated | Will be set back to the default `false` | On reload purge-on-no-consumers will be updated
|
||||||
attribute `address` | N/A | No effect unless starting broker | No effect unless starting broker
|
attribute `address` | N/A | No effect unless starting broker | No effect unless starting broker
|
||||||
attribute `filter` | N/A | No effect unless starting broker | No effect unless starting broker
|
attribute `filter` | The filter will be added after reloading | The filter will be removed after reloading | The filter will be updated after reloading
|
||||||
attribute `durable` | N/A | No effect unless starting broker | No effect unless starting broker
|
attribute `durable` | The queue durability will be set to the given value after reloading | The queue durability will be set to the default `true` after reloading | The queue durability will be set to the new value after reloading
|
||||||
|
|
||||||
### `<jms>` *(Deprecated)*
|
### `<jms>` *(Deprecated)*
|
||||||
|
|
||||||
|
|
|
@ -8,6 +8,17 @@ This chapter provides the following information for each release:
|
||||||
- **Note:** Follow the general upgrade procedure outlined in the [Upgrading the Broker](upgrading.md)
|
- **Note:** Follow the general upgrade procedure outlined in the [Upgrading the Broker](upgrading.md)
|
||||||
chapter in addition to any version-specific upgrade instructions outlined here.
|
chapter in addition to any version-specific upgrade instructions outlined here.
|
||||||
|
|
||||||
|
## 2.14.0
|
||||||
|
|
||||||
|
[Full release notes](https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315920&version=12348290).
|
||||||
|
|
||||||
|
Highlights:
|
||||||
|
- Removing `broker.xml` queue parameters now causes these to be reset to their default values.
|
||||||
|
|
||||||
|
#### Upgrading from older versions
|
||||||
|
|
||||||
|
Make sure the existing queues have their parameters set according to the `broker.xml` values before upgrading.
|
||||||
|
|
||||||
## 2.11.0
|
## 2.11.0
|
||||||
|
|
||||||
[Full release notes](https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315920&version=12346258).
|
[Full release notes](https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315920&version=12346258).
|
||||||
|
|
|
@ -37,10 +37,13 @@ import javax.jms.MessageProducer;
|
||||||
import javax.jms.Queue;
|
import javax.jms.Queue;
|
||||||
import javax.jms.Session;
|
import javax.jms.Session;
|
||||||
import javax.jms.TextMessage;
|
import javax.jms.TextMessage;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
|
||||||
import org.apache.activemq.artemis.api.core.RoutingType;
|
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
import org.apache.activemq.artemis.core.postoffice.Binding;
|
import org.apache.activemq.artemis.core.postoffice.Binding;
|
||||||
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
|
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
|
||||||
|
import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
|
||||||
import org.apache.activemq.artemis.core.security.Role;
|
import org.apache.activemq.artemis.core.security.Role;
|
||||||
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
|
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
|
||||||
import org.apache.activemq.artemis.core.server.cluster.impl.RemoteQueueBindingImpl;
|
import org.apache.activemq.artemis.core.server.cluster.impl.RemoteQueueBindingImpl;
|
||||||
|
@ -287,6 +290,164 @@ public class RedeployTest extends ActiveMQTestBase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void deployBrokerConfig(EmbeddedActiveMQ server, URL configFile) throws Exception {
|
||||||
|
|
||||||
|
Path brokerXML = getTestDirfile().toPath().resolve("broker.xml");
|
||||||
|
Files.copy(configFile.openStream(), brokerXML, StandardCopyOption.REPLACE_EXISTING);
|
||||||
|
|
||||||
|
final ReusableLatch latch = new ReusableLatch(1);
|
||||||
|
Runnable tick = latch::countDown;
|
||||||
|
server.getActiveMQServer().getReloadManager().setTick(tick);
|
||||||
|
|
||||||
|
latch.await(10, TimeUnit.SECONDS);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void doTestRemoveFilter(URL testConfiguration) throws Exception {
|
||||||
|
|
||||||
|
Path brokerXML = getTestDirfile().toPath().resolve("broker.xml");
|
||||||
|
|
||||||
|
URL baseConfig = RedeployTest.class.getClassLoader().getResource("reload-queue-filter.xml");
|
||||||
|
|
||||||
|
Files.copy(baseConfig.openStream(), brokerXML, StandardCopyOption.REPLACE_EXISTING);
|
||||||
|
|
||||||
|
EmbeddedActiveMQ embeddedActiveMQ = new EmbeddedActiveMQ();
|
||||||
|
embeddedActiveMQ.setConfigResourcePath(brokerXML.toUri().toString());
|
||||||
|
embeddedActiveMQ.start();
|
||||||
|
|
||||||
|
deployBrokerConfig(embeddedActiveMQ, baseConfig);
|
||||||
|
|
||||||
|
try {
|
||||||
|
|
||||||
|
try (ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
|
||||||
|
Connection connection = factory.createConnection();
|
||||||
|
Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE)) {
|
||||||
|
|
||||||
|
connection.start();
|
||||||
|
Queue queue = session.createQueue("myFilterQueue");
|
||||||
|
|
||||||
|
// Test that the original filter has been set up
|
||||||
|
LocalQueueBinding queueBinding = (LocalQueueBinding) embeddedActiveMQ.getActiveMQServer().getPostOffice()
|
||||||
|
.getBinding(new SimpleString("myFilterQueue"));
|
||||||
|
// The "x = 'x'" value is found in "reload-queue-filter.xml"
|
||||||
|
assertEquals("x = 'x'", queueBinding.getFilter().getFilterString().toString());
|
||||||
|
|
||||||
|
MessageProducer producer = session.createProducer(queue);
|
||||||
|
|
||||||
|
// Test that the original filter affects the flow
|
||||||
|
Message passingMessage = session.createMessage();
|
||||||
|
passingMessage.setStringProperty("x", "x");
|
||||||
|
producer.send(passingMessage);
|
||||||
|
|
||||||
|
Message filteredMessage = session.createMessage();
|
||||||
|
filteredMessage.setStringProperty("x", "y");
|
||||||
|
producer.send(filteredMessage);
|
||||||
|
|
||||||
|
MessageConsumer consumer = session.createConsumer(queue);
|
||||||
|
Message receivedMessage = consumer.receive(2000);
|
||||||
|
assertNotNull(receivedMessage);
|
||||||
|
assertEquals("x", receivedMessage.getStringProperty("x"));
|
||||||
|
|
||||||
|
assertNull(consumer.receive(2000));
|
||||||
|
|
||||||
|
consumer.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
deployBrokerConfig(embeddedActiveMQ, testConfiguration);
|
||||||
|
|
||||||
|
try (ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
|
||||||
|
Connection connection = factory.createConnection();
|
||||||
|
Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE)) {
|
||||||
|
|
||||||
|
connection.start();
|
||||||
|
Queue queue = session.createQueue("myFilterQueue");
|
||||||
|
|
||||||
|
// Test that the filter has been removed
|
||||||
|
LocalQueueBinding queueBinding = (LocalQueueBinding) embeddedActiveMQ.getActiveMQServer().getPostOffice()
|
||||||
|
.getBinding(new SimpleString("myFilterQueue"));
|
||||||
|
assertNull(queueBinding.getFilter());
|
||||||
|
|
||||||
|
MessageProducer producer = session.createProducer(queue);
|
||||||
|
|
||||||
|
// Test that the original filter no longer affects the flow
|
||||||
|
Message message1 = session.createMessage();
|
||||||
|
message1.setStringProperty("x", "x");
|
||||||
|
producer.send(message1);
|
||||||
|
|
||||||
|
Message message2 = session.createMessage();
|
||||||
|
message2.setStringProperty("x", "y");
|
||||||
|
producer.send(message2);
|
||||||
|
|
||||||
|
MessageConsumer consumer = session.createConsumer(queue);
|
||||||
|
assertNotNull(consumer.receive(2000));
|
||||||
|
assertNotNull(consumer.receive(2000));
|
||||||
|
|
||||||
|
consumer.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
} finally {
|
||||||
|
embeddedActiveMQ.stop();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRedeployRemoveFilter() throws Exception {
|
||||||
|
doTestRemoveFilter(RedeployTest.class.getClassLoader().getResource("reload-queue-filter-updated-empty.xml"));
|
||||||
|
doTestRemoveFilter(RedeployTest.class.getClassLoader().getResource("reload-queue-filter-removed.xml"));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRedeployQueueDefaults() throws Exception {
|
||||||
|
|
||||||
|
Path brokerXML = getTestDirfile().toPath().resolve("broker.xml");
|
||||||
|
URL baseConfig = RedeployTest.class.getClassLoader().getResource("reload-queue-defaults-before.xml");
|
||||||
|
URL newConfig = RedeployTest.class.getClassLoader().getResource("reload-queue-defaults-after.xml");
|
||||||
|
Files.copy(baseConfig.openStream(), brokerXML, StandardCopyOption.REPLACE_EXISTING);
|
||||||
|
EmbeddedActiveMQ embeddedActiveMQ = new EmbeddedActiveMQ();
|
||||||
|
embeddedActiveMQ.setConfigResourcePath(brokerXML.toUri().toString());
|
||||||
|
embeddedActiveMQ.start();
|
||||||
|
|
||||||
|
try {
|
||||||
|
LocalQueueBinding queueBinding = (LocalQueueBinding) embeddedActiveMQ.getActiveMQServer().getPostOffice()
|
||||||
|
.getBinding(new SimpleString("myQueue"));
|
||||||
|
org.apache.activemq.artemis.core.server.Queue queue = queueBinding.getQueue();
|
||||||
|
|
||||||
|
assertNotEquals(queue.getMaxConsumers(), ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers());
|
||||||
|
assertNotEquals(queue.getRoutingType(), RoutingType.MULTICAST);
|
||||||
|
assertNotEquals(queue.isPurgeOnNoConsumers(), ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers());
|
||||||
|
assertNotEquals(queue.isEnabled(), ActiveMQDefaultConfiguration.getDefaultEnabled());
|
||||||
|
assertNotEquals(queue.isExclusive(), ActiveMQDefaultConfiguration.getDefaultExclusive());
|
||||||
|
assertNotEquals(queue.isGroupRebalance(), ActiveMQDefaultConfiguration.getDefaultGroupRebalance());
|
||||||
|
assertNotEquals(queue.getGroupBuckets(), ActiveMQDefaultConfiguration.getDefaultGroupBuckets());
|
||||||
|
assertNotEquals(queue.getGroupFirstKey(), ActiveMQDefaultConfiguration.getDefaultGroupFirstKey());
|
||||||
|
assertNotEquals(queue.isNonDestructive(), ActiveMQDefaultConfiguration.getDefaultNonDestructive());
|
||||||
|
assertNotEquals(queue.getConsumersBeforeDispatch(), ActiveMQDefaultConfiguration.getDefaultConsumersBeforeDispatch());
|
||||||
|
assertNotEquals(queue.getDelayBeforeDispatch(), ActiveMQDefaultConfiguration.getDefaultDelayBeforeDispatch());
|
||||||
|
assertNotEquals(queue.getFilter(), null);
|
||||||
|
assertNotEquals(queue.getUser(), "jdoe");
|
||||||
|
assertNotEquals(queue.getRingSize(), ActiveMQDefaultConfiguration.getDefaultRingSize());
|
||||||
|
|
||||||
|
deployBrokerConfig(embeddedActiveMQ, newConfig);
|
||||||
|
|
||||||
|
assertEquals(queue.getMaxConsumers(), ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers());
|
||||||
|
assertEquals(queue.getRoutingType(), RoutingType.MULTICAST);
|
||||||
|
assertEquals(queue.isPurgeOnNoConsumers(), ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers());
|
||||||
|
assertEquals(queue.isEnabled(), ActiveMQDefaultConfiguration.getDefaultEnabled());
|
||||||
|
assertEquals(queue.isExclusive(), ActiveMQDefaultConfiguration.getDefaultExclusive());
|
||||||
|
assertEquals(queue.isGroupRebalance(), ActiveMQDefaultConfiguration.getDefaultGroupRebalance());
|
||||||
|
assertEquals(queue.getGroupBuckets(), ActiveMQDefaultConfiguration.getDefaultGroupBuckets());
|
||||||
|
assertEquals(queue.getGroupFirstKey(), ActiveMQDefaultConfiguration.getDefaultGroupFirstKey());
|
||||||
|
assertEquals(queue.isNonDestructive(), ActiveMQDefaultConfiguration.getDefaultNonDestructive());
|
||||||
|
assertEquals(queue.getConsumersBeforeDispatch(), ActiveMQDefaultConfiguration.getDefaultConsumersBeforeDispatch());
|
||||||
|
assertEquals(queue.getDelayBeforeDispatch(), ActiveMQDefaultConfiguration.getDefaultDelayBeforeDispatch());
|
||||||
|
assertEquals(queue.getFilter(), null);
|
||||||
|
assertEquals(queue.getUser(), null);
|
||||||
|
assertEquals(queue.getRingSize(), ActiveMQDefaultConfiguration.getDefaultRingSize());
|
||||||
|
|
||||||
|
} finally {
|
||||||
|
embeddedActiveMQ.stop();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testRedeployWithFailover() throws Exception {
|
public void testRedeployWithFailover() throws Exception {
|
||||||
Set<Role> original = new HashSet<>();
|
Set<Role> original = new HashSet<>();
|
||||||
|
|
|
@ -471,6 +471,53 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
|
||||||
checkNoResource(ObjectNameBuilder.DEFAULT.getAddressObjectName(address));
|
checkNoResource(ObjectNameBuilder.DEFAULT.getAddressObjectName(address));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRemoveQueueFilter() throws Exception {
|
||||||
|
|
||||||
|
String address = RandomUtil.randomString();
|
||||||
|
QueueConfiguration queue1 = new QueueConfiguration("q1")
|
||||||
|
.setAddress(address)
|
||||||
|
.setFilterString("hello='world'");
|
||||||
|
|
||||||
|
QueueConfiguration queue2 = new QueueConfiguration("q2")
|
||||||
|
.setAddress(address)
|
||||||
|
.setFilterString("hello='darling'");
|
||||||
|
|
||||||
|
ActiveMQServerControl serverControl = createManagementControl();
|
||||||
|
serverControl.createAddress(address, "MULTICAST");
|
||||||
|
|
||||||
|
if (legacyCreateQueue) {
|
||||||
|
serverControl.createQueue(address, queue1.getName().toString(), queue1.getFilterString().toString(), queue1.isDurable());
|
||||||
|
serverControl.createQueue(address, queue2.getName().toString(), queue2.getFilterString().toString(), queue2.isDurable());
|
||||||
|
} else {
|
||||||
|
serverControl.createQueue(queue1.toJSON());
|
||||||
|
serverControl.createQueue(queue2.toJSON());
|
||||||
|
}
|
||||||
|
|
||||||
|
ServerLocator loc = createInVMNonHALocator();
|
||||||
|
ClientSessionFactory csf = createSessionFactory(loc);
|
||||||
|
ClientSession session = csf.createSession();
|
||||||
|
session.start();
|
||||||
|
|
||||||
|
ClientProducer producer = session.createProducer(address);
|
||||||
|
ClientConsumer consumer1 = session.createConsumer("q1");
|
||||||
|
ClientConsumer consumer2 = session.createConsumer("q2");
|
||||||
|
|
||||||
|
ClientMessage m = session.createMessage(true);
|
||||||
|
m.putStringProperty("hello", "world");
|
||||||
|
producer.send(m);
|
||||||
|
|
||||||
|
assertNotNull(consumer1.receiveImmediate());
|
||||||
|
assertNull(consumer2.receiveImmediate());
|
||||||
|
|
||||||
|
serverControl.updateQueue(queue2.setFilterString((String) null).toJSON());
|
||||||
|
|
||||||
|
producer.send(m);
|
||||||
|
|
||||||
|
assertNotNull(consumer1.receiveImmediate());
|
||||||
|
assertNotNull(consumer2.receiveImmediate());
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testCreateAndDestroyQueueClosingConsumers() throws Exception {
|
public void testCreateAndDestroyQueueClosingConsumers() throws Exception {
|
||||||
SimpleString address = RandomUtil.randomSimpleString();
|
SimpleString address = RandomUtil.randomSimpleString();
|
||||||
|
|
|
@ -0,0 +1,43 @@
|
||||||
|
<?xml version='1.0'?>
|
||||||
|
<!--
|
||||||
|
Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
or more contributor license agreements. See the NOTICE file
|
||||||
|
distributed with this work for additional information
|
||||||
|
regarding copyright ownership. The ASF licenses this file
|
||||||
|
to you under the Apache License, Version 2.0 (the
|
||||||
|
"License"); you may not use this file except in compliance
|
||||||
|
with the License. You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing,
|
||||||
|
software distributed under the License is distributed on an
|
||||||
|
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
KIND, either express or implied. See the License for the
|
||||||
|
specific language governing permissions and limitations
|
||||||
|
under the License.
|
||||||
|
-->
|
||||||
|
|
||||||
|
<configuration xmlns="urn:activemq"
|
||||||
|
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||||
|
xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
|
||||||
|
|
||||||
|
<core xmlns="urn:activemq:core">
|
||||||
|
<security-enabled>false</security-enabled>
|
||||||
|
<persistence-enabled>false</persistence-enabled>
|
||||||
|
|
||||||
|
<acceptors>
|
||||||
|
<acceptor name="artemis">tcp://0.0.0.0:61616</acceptor>
|
||||||
|
</acceptors>
|
||||||
|
|
||||||
|
<addresses>
|
||||||
|
<address name="myQueue">
|
||||||
|
<multicast>
|
||||||
|
<!-- Shoud reset queue according to org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration -->
|
||||||
|
<!-- some values cannot be changed: last-value -->
|
||||||
|
<queue name="myQueue" />
|
||||||
|
</multicast>
|
||||||
|
</address>
|
||||||
|
</addresses>
|
||||||
|
</core>
|
||||||
|
</configuration>
|
|
@ -0,0 +1,59 @@
|
||||||
|
<?xml version='1.0'?>
|
||||||
|
<!--
|
||||||
|
Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
or more contributor license agreements. See the NOTICE file
|
||||||
|
distributed with this work for additional information
|
||||||
|
regarding copyright ownership. The ASF licenses this file
|
||||||
|
to you under the Apache License, Version 2.0 (the
|
||||||
|
"License"); you may not use this file except in compliance
|
||||||
|
with the License. You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing,
|
||||||
|
software distributed under the License is distributed on an
|
||||||
|
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
KIND, either express or implied. See the License for the
|
||||||
|
specific language governing permissions and limitations
|
||||||
|
under the License.
|
||||||
|
-->
|
||||||
|
|
||||||
|
<configuration xmlns="urn:activemq"
|
||||||
|
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||||
|
xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
|
||||||
|
|
||||||
|
<core xmlns="urn:activemq:core">
|
||||||
|
<security-enabled>false</security-enabled>
|
||||||
|
<persistence-enabled>false</persistence-enabled>
|
||||||
|
|
||||||
|
<acceptors>
|
||||||
|
<acceptor name="artemis">tcp://0.0.0.0:61616</acceptor>
|
||||||
|
</acceptors>
|
||||||
|
|
||||||
|
<addresses>
|
||||||
|
<address name="myQueue">
|
||||||
|
<anycast>
|
||||||
|
<!-- non-default values based on org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration -->
|
||||||
|
<!-- some values cannot be changed: last-value -->
|
||||||
|
<queue name="myQueue"
|
||||||
|
max-consumers="10"
|
||||||
|
purge-on-no-consumers="true"
|
||||||
|
exclusive="true"
|
||||||
|
group-rebalance="true"
|
||||||
|
group-buckets="10"
|
||||||
|
group-first-key="foo"
|
||||||
|
non-destructive="true"
|
||||||
|
consumers-before-dispatch="10"
|
||||||
|
delay-before-dispatch="10"
|
||||||
|
ring-size="10"
|
||||||
|
enabled="false"
|
||||||
|
>
|
||||||
|
<durable>false</durable>
|
||||||
|
<filter string="x = 'x'"/>
|
||||||
|
<user>jdoe</user>
|
||||||
|
</queue>
|
||||||
|
</anycast>
|
||||||
|
</address>
|
||||||
|
</addresses>
|
||||||
|
</core>
|
||||||
|
</configuration>
|
|
@ -0,0 +1,42 @@
|
||||||
|
<?xml version='1.0'?>
|
||||||
|
<!--
|
||||||
|
Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
or more contributor license agreements. See the NOTICE file
|
||||||
|
distributed with this work for additional information
|
||||||
|
regarding copyright ownership. The ASF licenses this file
|
||||||
|
to you under the Apache License, Version 2.0 (the
|
||||||
|
"License"); you may not use this file except in compliance
|
||||||
|
with the License. You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing,
|
||||||
|
software distributed under the License is distributed on an
|
||||||
|
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
KIND, either express or implied. See the License for the
|
||||||
|
specific language governing permissions and limitations
|
||||||
|
under the License.
|
||||||
|
-->
|
||||||
|
|
||||||
|
<configuration xmlns="urn:activemq"
|
||||||
|
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||||
|
xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
|
||||||
|
|
||||||
|
<core xmlns="urn:activemq:core">
|
||||||
|
<security-enabled>false</security-enabled>
|
||||||
|
<persistence-enabled>false</persistence-enabled>
|
||||||
|
|
||||||
|
<acceptors>
|
||||||
|
<acceptor name="artemis">tcp://0.0.0.0:61616</acceptor>
|
||||||
|
</acceptors>
|
||||||
|
|
||||||
|
<addresses>
|
||||||
|
<address name="myFilterQueue">
|
||||||
|
<anycast>
|
||||||
|
<queue name="myFilterQueue">
|
||||||
|
</queue>
|
||||||
|
</anycast>
|
||||||
|
</address>
|
||||||
|
</addresses>
|
||||||
|
</core>
|
||||||
|
</configuration>
|
|
@ -0,0 +1,43 @@
|
||||||
|
<?xml version='1.0'?>
|
||||||
|
<!--
|
||||||
|
Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
or more contributor license agreements. See the NOTICE file
|
||||||
|
distributed with this work for additional information
|
||||||
|
regarding copyright ownership. The ASF licenses this file
|
||||||
|
to you under the Apache License, Version 2.0 (the
|
||||||
|
"License"); you may not use this file except in compliance
|
||||||
|
with the License. You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing,
|
||||||
|
software distributed under the License is distributed on an
|
||||||
|
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
KIND, either express or implied. See the License for the
|
||||||
|
specific language governing permissions and limitations
|
||||||
|
under the License.
|
||||||
|
-->
|
||||||
|
|
||||||
|
<configuration xmlns="urn:activemq"
|
||||||
|
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||||
|
xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
|
||||||
|
|
||||||
|
<core xmlns="urn:activemq:core">
|
||||||
|
<security-enabled>false</security-enabled>
|
||||||
|
<persistence-enabled>false</persistence-enabled>
|
||||||
|
|
||||||
|
<acceptors>
|
||||||
|
<acceptor name="artemis">tcp://0.0.0.0:61616</acceptor>
|
||||||
|
</acceptors>
|
||||||
|
|
||||||
|
<addresses>
|
||||||
|
<address name="myFilterQueue">
|
||||||
|
<anycast>
|
||||||
|
<queue name="myFilterQueue">
|
||||||
|
<filter string=""/>
|
||||||
|
</queue>
|
||||||
|
</anycast>
|
||||||
|
</address>
|
||||||
|
</addresses>
|
||||||
|
</core>
|
||||||
|
</configuration>
|
Loading…
Reference in New Issue