ARTEMIS-1235 delete queues when broker.xml changes

Add extra configuration to address-settings to be able to
control / enable address/queue deletion by pattern,
rather than a global toggle.

Add support in the reload logic to remove address
and/or queues if the address matches an address setting,
where it is enabled.
This commit is contained in:
Michael Andre Pearce 2017-06-17 06:43:02 +01:00 committed by Clebert Suconic
parent be8eb3ec9f
commit f63f130407
14 changed files with 616 additions and 5 deletions

View File

@ -24,6 +24,7 @@ import org.apache.activemq.artemis.core.server.JournalType;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.DeletionPolicy;
import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy;
/**
@ -156,6 +157,16 @@ public final class Validators {
}
};
public static final Validator DELETION_POLICY_TYPE = new Validator() {
@Override
public void validate(final String name, final Object value) {
String val = (String) value;
if (val == null || !val.equals(DeletionPolicy.OFF.toString()) && !val.equals(DeletionPolicy.FORCE.toString())) {
throw ActiveMQMessageBundle.BUNDLE.invalidDeletionPolicyType(val);
}
}
};
public static final Validator MESSAGE_LOAD_BALANCING_TYPE = new Validator() {
@Override
public void validate(final String name, final Object value) {

View File

@ -72,6 +72,7 @@ import org.apache.activemq.artemis.core.server.group.impl.GroupingHandlerConfigu
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.settings.impl.DeletionPolicy;
import org.apache.activemq.artemis.core.settings.impl.ResourceLimitSettings;
import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy;
import org.apache.activemq.artemis.utils.ByteUtil;
@ -193,10 +194,14 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
private static final String AUTO_DELETE_QUEUES = "auto-delete-queues";
private static final String CONFIG_DELETE_QUEUES = "config-delete-queues";
private static final String AUTO_CREATE_ADDRESSES = "auto-create-addresses";
private static final String AUTO_DELETE_ADDRESSES = "auto-delete-addresses";
private static final String CONFIG_DELETE_ADDRESSES = "config-delete-addresses";
private static final String DEFAULT_PURGE_ON_NO_CONSUMERS = "default-purge-on-no-consumers";
private static final String DEFAULT_MAX_CONSUMERS = "default-max-consumers";
@ -985,10 +990,20 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
addressSettings.setAutoCreateQueues(XMLUtil.parseBoolean(child));
} else if (AUTO_DELETE_QUEUES.equalsIgnoreCase(name)) {
addressSettings.setAutoDeleteQueues(XMLUtil.parseBoolean(child));
} else if (CONFIG_DELETE_QUEUES.equalsIgnoreCase(name)) {
String value = getTrimmedTextContent(child);
Validators.DELETION_POLICY_TYPE.validate(CONFIG_DELETE_QUEUES, value);
DeletionPolicy policy = Enum.valueOf(DeletionPolicy.class, value);
addressSettings.setConfigDeleteQueues(policy);
} else if (AUTO_CREATE_ADDRESSES.equalsIgnoreCase(name)) {
addressSettings.setAutoCreateAddresses(XMLUtil.parseBoolean(child));
} else if (AUTO_DELETE_ADDRESSES.equalsIgnoreCase(name)) {
addressSettings.setAutoDeleteAddresses(XMLUtil.parseBoolean(child));
} else if (CONFIG_DELETE_ADDRESSES.equalsIgnoreCase(name)) {
String value = getTrimmedTextContent(child);
Validators.DELETION_POLICY_TYPE.validate(CONFIG_DELETE_ADDRESSES, value);
DeletionPolicy policy = Enum.valueOf(DeletionPolicy.class, value);
addressSettings.setConfigDeleteAddresses(policy);
} else if (MANAGEMENT_BROWSE_PAGE_SIZE.equalsIgnoreCase(name)) {
addressSettings.setManagementBrowsePageSize(XMLUtil.parseInt(child));
} else if (DEFAULT_PURGE_ON_NO_CONSUMERS.equalsIgnoreCase(name)) {

View File

@ -427,5 +427,7 @@ public interface ActiveMQMessageBundle {
String address,
Set<RoutingType> supportedRoutingTypes);
@Message(id = 119212, value = "Invalid deletion policy type {0}", format = Message.Format.MESSAGE_FORMAT)
IllegalArgumentException invalidDeletionPolicyType(String val);
}

View File

@ -1582,4 +1582,12 @@ public interface ActiveMQServerLogger extends BasicLogger {
@LogMessage(level = Logger.Level.ERROR)
@Message(id = 224075, value = "Cannot find pageTX id = {0}", format = Message.Format.MESSAGE_FORMAT)
void journalCannotFindPageTX(Long id);
@LogMessage(level = Logger.Level.INFO)
@Message(id = 224076, value = "UnDeploying address {0}", format = Message.Format.MESSAGE_FORMAT)
void undeployAddress(SimpleString addressName);
@LogMessage(level = Logger.Level.INFO)
@Message(id = 224077, value = "UnDeploying queue {0}", format = Message.Format.MESSAGE_FORMAT)
void undeployQueue(SimpleString queueName);
}

View File

@ -47,6 +47,7 @@ import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.ActiveMQDeleteAddressException;
@ -151,6 +152,7 @@ import org.apache.activemq.artemis.core.server.reload.ReloadManager;
import org.apache.activemq.artemis.core.server.reload.ReloadManagerImpl;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.settings.impl.DeletionPolicy;
import org.apache.activemq.artemis.core.settings.impl.HierarchicalObjectRepository;
import org.apache.activemq.artemis.core.settings.impl.ResourceLimitSettings;
import org.apache.activemq.artemis.core.transaction.ResourceManager;
@ -2238,6 +2240,9 @@ public class ActiveMQServerImpl implements ActiveMQServer {
// Deploy any predefined queues
deployQueuesFromConfiguration();
// Undeploy any addresses and queues not in config
undeployAddressesAndQueueNotInConfiguration();
// We need to call this here, this gives any dependent server a chance to deploy its own addresses
// this needs to be done before clustering is fully activated
callActivateCallbacks();
@ -2313,6 +2318,53 @@ public class ActiveMQServerImpl implements ActiveMQServer {
}
}
private void undeployAddressesAndQueueNotInConfiguration() throws Exception {
undeployAddressesAndQueueNotInConfiguration(configuration);
}
private void undeployAddressesAndQueueNotInConfiguration(Configuration configuration) throws Exception {
Set<String> addressesInConfig = configuration.getAddressConfigurations().stream()
.map(CoreAddressConfiguration::getName)
.collect(Collectors.toSet());
Set<String> queuesInConfig = configuration.getAddressConfigurations().stream()
.map(CoreAddressConfiguration::getQueueConfigurations)
.flatMap(List::stream).map(CoreQueueConfiguration::getName)
.collect(Collectors.toSet());
for (SimpleString addressName : listAddressNames()) {
AddressSettings addressSettings = getAddressSettingsRepository().getMatch(addressName.toString());
if (!addressesInConfig.contains(addressName.toString()) && addressSettings.getConfigDeleteAddresses() == DeletionPolicy.FORCE) {
for (Queue queue : listQueues(addressName)) {
ActiveMQServerLogger.LOGGER.undeployQueue(queue.getName());
queue.deleteQueue(true);
}
ActiveMQServerLogger.LOGGER.undeployAddress(addressName);
removeAddressInfo(addressName, null);
} else if (addressSettings.getConfigDeleteQueues() == DeletionPolicy.FORCE) {
for (Queue queue : listConfiguredQueues(addressName)) {
if (!queuesInConfig.contains(queue.getName().toString())) {
ActiveMQServerLogger.LOGGER.undeployQueue(queue.getName());
queue.deleteQueue(true);
}
}
}
}
}
private Set<SimpleString> listAddressNames() {
return postOffice.getAddresses();
}
private List<Queue> listConfiguredQueues(SimpleString address) throws Exception {
return listQueues(address).stream().filter(queue -> !queue.isAutoCreated() && !queue.isInternalQueue()).collect(Collectors.toList());
}
private List<Queue> listQueues(SimpleString address) throws Exception {
return postOffice.listQueuesForAddress(address);
}
private void deployAddressesFromConfiguration() throws Exception {
deployAddressesFromConfiguration(configuration);
}
@ -2818,6 +2870,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
}
ActiveMQServerLogger.LOGGER.reloadingConfiguration("addresses");
deployAddressesFromConfiguration(config);
undeployAddressesAndQueueNotInConfiguration(config);
}
}

View File

@ -71,10 +71,14 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
public static final boolean DEFAULT_AUTO_DELETE_QUEUES = true;
public static final DeletionPolicy DEFAULT_CONFIG_DELETE_QUEUES = DeletionPolicy.OFF;
public static final boolean DEFAULT_AUTO_CREATE_ADDRESSES = true;
public static final boolean DEFAULT_AUTO_DELETE_ADDRESSES = true;
public static final DeletionPolicy DEFAULT_CONFIG_DELETE_ADDRESSES = DeletionPolicy.OFF;
public static final long DEFAULT_REDISTRIBUTION_DELAY = -1;
public static final long DEFAULT_EXPIRY_DELAY = -1;
@ -148,10 +152,14 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
private Boolean autoDeleteQueues = null;
private DeletionPolicy configDeleteQueues = null;
private Boolean autoCreateAddresses = null;
private Boolean autoDeleteAddresses = null;
private DeletionPolicy configDeleteAddresses = null;
private Integer managementBrowsePageSize = AddressSettings.MANAGEMENT_BROWSE_PAGE_SIZE;
private Long maxSizeBytesRejectThreshold = null;
@ -194,8 +202,10 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
this.autoDeleteJmsTopics = other.autoDeleteJmsTopics;
this.autoCreateQueues = other.autoCreateQueues;
this.autoDeleteQueues = other.autoDeleteQueues;
this.configDeleteQueues = other.configDeleteQueues;
this.autoCreateAddresses = other.autoCreateAddresses;
this.autoDeleteAddresses = other.autoDeleteAddresses;
this.configDeleteAddresses = other.configDeleteAddresses;
this.managementBrowsePageSize = other.managementBrowsePageSize;
this.queuePrefetch = other.queuePrefetch;
this.maxSizeBytesRejectThreshold = other.maxSizeBytesRejectThreshold;
@ -270,6 +280,15 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
return this;
}
public DeletionPolicy getConfigDeleteQueues() {
return configDeleteQueues != null ? configDeleteQueues : AddressSettings.DEFAULT_CONFIG_DELETE_QUEUES;
}
public AddressSettings setConfigDeleteQueues(DeletionPolicy configDeleteQueues) {
this.configDeleteQueues = configDeleteQueues;
return this;
}
public boolean isAutoCreateAddresses() {
return autoCreateAddresses != null ? autoCreateAddresses : AddressSettings.DEFAULT_AUTO_CREATE_ADDRESSES;
}
@ -288,6 +307,15 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
return this;
}
public DeletionPolicy getConfigDeleteAddresses() {
return configDeleteAddresses != null ? configDeleteAddresses : AddressSettings.DEFAULT_CONFIG_DELETE_ADDRESSES;
}
public AddressSettings setConfigDeleteAddresses(DeletionPolicy configDeleteAddresses) {
this.configDeleteAddresses = configDeleteAddresses;
return this;
}
public int getDefaultMaxConsumers() {
return defaultMaxConsumers != null ? defaultMaxConsumers : ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers();
}
@ -594,12 +622,18 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
if (autoDeleteQueues == null) {
autoDeleteQueues = merged.autoDeleteQueues;
}
if (configDeleteQueues == null) {
configDeleteQueues = merged.configDeleteQueues;
}
if (autoCreateAddresses == null) {
autoCreateAddresses = merged.autoCreateAddresses;
}
if (autoDeleteAddresses == null) {
autoDeleteAddresses = merged.autoDeleteAddresses;
}
if (configDeleteAddresses == null) {
configDeleteAddresses = merged.configDeleteAddresses;
}
if (managementBrowsePageSize == null) {
managementBrowsePageSize = merged.managementBrowsePageSize;
}
@ -687,10 +721,25 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
autoDeleteQueues = BufferHelper.readNullableBoolean(buffer);
policyStr = buffer.readNullableSimpleString();
if (policyStr != null) {
configDeleteQueues = DeletionPolicy.valueOf(policyStr.toString());
} else {
configDeleteQueues = null;
}
autoCreateAddresses = BufferHelper.readNullableBoolean(buffer);
autoDeleteAddresses = BufferHelper.readNullableBoolean(buffer);
policyStr = buffer.readNullableSimpleString();
if (policyStr != null) {
configDeleteAddresses = DeletionPolicy.valueOf(policyStr.toString());
} else {
configDeleteAddresses = null;
}
managementBrowsePageSize = BufferHelper.readNullableInteger(buffer);
maxSizeBytesRejectThreshold = BufferHelper.readNullableLong(buffer);
@ -731,9 +780,9 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
BufferHelper.sizeOfNullableBoolean(autoCreateJmsTopics) +
BufferHelper.sizeOfNullableBoolean(autoDeleteJmsTopics) +
BufferHelper.sizeOfNullableBoolean(autoCreateQueues) +
BufferHelper.sizeOfNullableBoolean(autoDeleteQueues) +
BufferHelper.sizeOfNullableBoolean(autoDeleteQueues) + BufferHelper.sizeOfNullableSimpleString(configDeleteQueues != null ? configDeleteQueues.toString() : null) +
BufferHelper.sizeOfNullableBoolean(autoCreateAddresses) +
BufferHelper.sizeOfNullableBoolean(autoDeleteAddresses) +
BufferHelper.sizeOfNullableBoolean(autoDeleteAddresses) + BufferHelper.sizeOfNullableSimpleString(configDeleteAddresses != null ? configDeleteAddresses.toString() : null) +
BufferHelper.sizeOfNullableInteger(managementBrowsePageSize) +
BufferHelper.sizeOfNullableLong(maxSizeBytesRejectThreshold) +
BufferHelper.sizeOfNullableInteger(defaultMaxConsumers) +
@ -794,10 +843,14 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
BufferHelper.writeNullableBoolean(buffer, autoDeleteQueues);
buffer.writeNullableSimpleString(configDeleteQueues != null ? new SimpleString(configDeleteQueues.toString()) : null);
BufferHelper.writeNullableBoolean(buffer, autoCreateAddresses);
BufferHelper.writeNullableBoolean(buffer, autoDeleteAddresses);
buffer.writeNullableSimpleString(configDeleteAddresses != null ? new SimpleString(configDeleteAddresses.toString()) : null);
BufferHelper.writeNullableInteger(buffer, managementBrowsePageSize);
BufferHelper.writeNullableLong(buffer, maxSizeBytesRejectThreshold);
@ -843,8 +896,10 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
result = prime * result + ((autoDeleteJmsTopics == null) ? 0 : autoDeleteJmsTopics.hashCode());
result = prime * result + ((autoCreateQueues == null) ? 0 : autoCreateQueues.hashCode());
result = prime * result + ((autoDeleteQueues == null) ? 0 : autoDeleteQueues.hashCode());
result = prime * result + ((configDeleteQueues == null) ? 0 : configDeleteQueues.hashCode());
result = prime * result + ((autoCreateAddresses == null) ? 0 : autoCreateAddresses.hashCode());
result = prime * result + ((autoDeleteAddresses == null) ? 0 : autoDeleteAddresses.hashCode());
result = prime * result + ((configDeleteAddresses == null) ? 0 : configDeleteAddresses.hashCode());
result = prime * result + ((managementBrowsePageSize == null) ? 0 : managementBrowsePageSize.hashCode());
result = prime * result + ((queuePrefetch == null) ? 0 : queuePrefetch.hashCode());
result = prime * result + ((maxSizeBytesRejectThreshold == null) ? 0 : maxSizeBytesRejectThreshold.hashCode());
@ -992,6 +1047,11 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
return false;
} else if (!autoDeleteQueues.equals(other.autoDeleteQueues))
return false;
if (configDeleteQueues == null) {
if (other.configDeleteQueues != null)
return false;
} else if (!configDeleteQueues.equals(other.configDeleteQueues))
return false;
if (autoCreateAddresses == null) {
if (other.autoCreateAddresses != null)
return false;
@ -1002,6 +1062,11 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
return false;
} else if (!autoDeleteAddresses.equals(other.autoDeleteAddresses))
return false;
if (configDeleteAddresses == null) {
if (other.configDeleteAddresses != null)
return false;
} else if (!configDeleteAddresses.equals(other.configDeleteAddresses))
return false;
if (managementBrowsePageSize == null) {
if (other.managementBrowsePageSize != null)
return false;
@ -1101,10 +1166,14 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
autoCreateQueues +
", autoDeleteQueues=" +
autoDeleteQueues +
", configDeleteQueues=" +
configDeleteQueues +
", autoCreateAddresses=" +
autoCreateAddresses +
", autoDeleteAddresses=" +
autoDeleteAddresses +
", configDeleteAddresses=" +
configDeleteAddresses +
", managementBrowsePageSize=" +
managementBrowsePageSize +
", defaultMaxConsumers=" +

View File

@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.settings.impl;
public enum DeletionPolicy {
OFF, FORCE;
}

View File

@ -2635,6 +2635,22 @@
</xsd:annotation>
</xsd:element>
<xsd:element name="config-delete-queues" default="OFF" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
What to do when a queue is no longer in broker.xml.
OFF = will do nothing queues will remain,
FORCE = delete queues even if messages remaining.
</xsd:documentation>
</xsd:annotation>
<xsd:simpleType>
<xsd:restriction base="xsd:string">
<xsd:enumeration value="OFF"/>
<xsd:enumeration value="FORCE"/>
</xsd:restriction>
</xsd:simpleType>
</xsd:element>
<xsd:element name="auto-create-addresses" type="xsd:boolean" default="true" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
@ -2653,6 +2669,22 @@
</xsd:annotation>
</xsd:element>
<xsd:element name="config-delete-addresses" default="OFF" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
What to do when an address is no longer in broker.xml.
OFF = will do nothing addresses will remain,
FORCE = delete address and its queues even if messages remaining.
</xsd:documentation>
</xsd:annotation>
<xsd:simpleType>
<xsd:restriction base="xsd:string">
<xsd:enumeration value="OFF"/>
<xsd:enumeration value="FORCE"/>
</xsd:restriction>
</xsd:simpleType>
</xsd:element>
<xsd:element name="management-browse-page-size" type="xsd:int" default="200" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
@ -2787,7 +2819,6 @@
</xsd:simpleContent>
</xsd:complexType>
<!-- 2.0 Addressing configuration -->
<xsd:simpleType name="routing-type">

View File

@ -2455,6 +2455,22 @@
</xsd:annotation>
</xsd:element>
<xsd:element name="config-delete-queues" default="OFF" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
What to do when a queue is no longer in broker.xml.
OFF = will do nothing queues will remain,
FORCE = delete queues even if messages remaining.
</xsd:documentation>
</xsd:annotation>
<xsd:simpleType>
<xsd:restriction base="xsd:string">
<xsd:enumeration value="OFF"/>
<xsd:enumeration value="FORCE"/>
</xsd:restriction>
</xsd:simpleType>
</xsd:element>
<xsd:element name="auto-create-addresses" type="xsd:boolean" default="true" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
@ -2468,11 +2484,27 @@
<xsd:element name="auto-delete-addresses" type="xsd:boolean" default="true" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
whether or not to delete auto-created addresses when it no longer has any queues
What to do when an address is no longer in broker.xml.
OFF = will do nothing addresses will remain,
FORCE = delete address and its queues even if messages remaining.
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="config-delete-addresses" default="OFF" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
What to do when an address is no longer in broker.xml. OFF = will do nothing addresses will remain, FORCE = delete queues, and address even if messages remaining.
</xsd:documentation>
</xsd:annotation>
<xsd:simpleType>
<xsd:restriction base="xsd:string">
<xsd:enumeration value="OFF"/>
<xsd:enumeration value="FORCE"/>
</xsd:restriction>
</xsd:simpleType>
</xsd:element>
<xsd:element name="management-browse-page-size" type="xsd:int" default="200" maxOccurs="1"
minOccurs="0">
<xsd:annotation>

View File

@ -577,6 +577,11 @@ are durable, non-temporary, and non-transient. Default is `true`.
delete auto-created queues when they have both 0 consumers and 0 messages.
Default is `true`.
`config-delete-queues`. How the broker should handle queues deleted
on config reload, by delete policy: `OFF` or `FORCE`.
See [config-reload](config-reload.md) for more details.
Default is `OFF`.
`auto-create-addresses`. Whether or not the broker should automatically
create an address when a message is sent to or a consumer tries to consume
from a queue which is mapped to an address whose name fits the address `match`.
@ -585,3 +590,9 @@ Default is `true`.
`auto-delete-addresses`. Whether or not the broker should automatically
delete auto-created addresses once the address no longer has any queues.
Default is `true`.
`config-delete-addresses`. How the broker should handle addresses deleted
on config reload, by delete policy: `OFF` or `FORCE`.
See [config-reload](config-reload.md) for more details.
Default is `OFF`.

View File

@ -10,4 +10,19 @@ Once the configuration file is changed (broker.xml) the following modules will b
- Addresses & queues
Notice: Address & queues won't be removed upon reload, given the risk of losing messages. You may execute explicit CLI or Management operations to remove destinations.
Notice:
Deletion of Address's and Queue's, not auto created is controlled by Address Settings
* config-delete-addresses
* OFF (DEFAULT) - will not remove upon config reload.
* FORCE - will remove the address and its queues upon config reload, even if messages remains, losing the messages in the address & queues.
* config-delete-queues
* OFF (DEFAULT) - will not remove upon config reload.
* FORCE - will remove the queue upon config reload, even if messages remains, losing the messages in the queue.
By default both settings are OFF as such address & queues won't be removed upon reload, given the risk of losing messages.
When OFF You may execute explicit CLI or Management operations to remove address & queues.

View File

@ -27,8 +27,12 @@ import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
@ -104,4 +108,72 @@ public class RedeployTest extends ActiveMQTestBase {
}
}
@Test
public void testRedeployAddressQueue() throws Exception {
Path brokerXML = getTestDirfile().toPath().resolve("broker.xml");
URL url1 = RedeployTest.class.getClassLoader().getResource("reload-address-queues.xml");
URL url2 = RedeployTest.class.getClassLoader().getResource("reload-address-queues-updated.xml");
Files.copy(url1.openStream(), brokerXML);
EmbeddedJMS embeddedJMS = new EmbeddedJMS();
embeddedJMS.setConfigResourcePath(brokerXML.toUri().toString());
embeddedJMS.start();
final ReusableLatch latch = new ReusableLatch(1);
Runnable tick = new Runnable() {
@Override
public void run() {
latch.countDown();
}
};
embeddedJMS.getActiveMQServer().getReloadManager().setTick(tick);
try {
latch.await(10, TimeUnit.SECONDS);
Assert.assertNotNull(getAddressInfo(embeddedJMS, "config_test_address_removal"));
Assert.assertNotNull(getAddressInfo(embeddedJMS, "config_test_queue_removal"));
Assert.assertTrue(listQueuesNamesForAddress(embeddedJMS, "config_test_queue_removal").contains("config_test_queue_removal_queue_1"));
Assert.assertTrue(listQueuesNamesForAddress(embeddedJMS, "config_test_queue_removal").contains("config_test_queue_removal_queue_2"));
Assert.assertNotNull(getAddressInfo(embeddedJMS, "permanent_test_address_removal"));
Assert.assertNotNull(getAddressInfo(embeddedJMS, "permanent_test_queue_removal"));
Assert.assertTrue(listQueuesNamesForAddress(embeddedJMS, "permanent_test_queue_removal").contains("permanent_test_queue_removal_queue_1"));
Assert.assertTrue(listQueuesNamesForAddress(embeddedJMS, "permanent_test_queue_removal").contains("permanent_test_queue_removal_queue_2"));
Files.copy(url2.openStream(), brokerXML, StandardCopyOption.REPLACE_EXISTING);
brokerXML.toFile().setLastModified(System.currentTimeMillis() + 1000);
latch.setCount(1);
embeddedJMS.getActiveMQServer().getReloadManager().setTick(tick);
latch.await(10, TimeUnit.SECONDS);
Assert.assertNull(getAddressInfo(embeddedJMS, "config_test_address_removal"));
Assert.assertNotNull(getAddressInfo(embeddedJMS, "config_test_queue_removal"));
Assert.assertTrue(listQueuesNamesForAddress(embeddedJMS, "config_test_queue_removal").contains("config_test_queue_removal_queue_1"));
Assert.assertFalse(listQueuesNamesForAddress(embeddedJMS, "config_test_queue_removal").contains("config_test_queue_removal_queue_2"));
Assert.assertNotNull(getAddressInfo(embeddedJMS, "permanent_test_address_removal"));
Assert.assertNotNull(getAddressInfo(embeddedJMS, "permanent_test_queue_removal"));
Assert.assertTrue(listQueuesNamesForAddress(embeddedJMS, "permanent_test_queue_removal").contains("permanent_test_queue_removal_queue_1"));
Assert.assertTrue(listQueuesNamesForAddress(embeddedJMS, "permanent_test_queue_removal").contains("permanent_test_queue_removal_queue_2"));
} finally {
embeddedJMS.stop();
}
}
private AddressInfo getAddressInfo(EmbeddedJMS embeddedJMS, String address) {
return embeddedJMS.getActiveMQServer().getPostOffice().getAddressInfo(SimpleString.toSimpleString(address));
}
private List<String> listQueuesNamesForAddress(EmbeddedJMS embeddedJMS, String address) throws Exception {
return embeddedJMS.getActiveMQServer().getPostOffice().listQueuesForAddress(SimpleString.toSimpleString(address)).stream().map(
org.apache.activemq.artemis.core.server.Queue::getName).map(SimpleString::toString).collect(Collectors.toList());
}
}

View File

@ -0,0 +1,128 @@
<?xml version='1.0'?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<configuration xmlns="urn:activemq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
<core xmlns="urn:activemq:core">
<name>0.0.0.0</name>
<configuration-file-refresh-period>100</configuration-file-refresh-period>
<persistence-enabled>false</persistence-enabled>
<security-enabled>false</security-enabled>
<!-- this could be ASYNCIO or NIO
-->
<journal-type>NIO</journal-type>
<paging-directory>./data/paging</paging-directory>
<bindings-directory>./data/bindings</bindings-directory>
<journal-directory>./data/journal</journal-directory>
<large-messages-directory>./data/large-messages</large-messages-directory>
<journal-min-files>2</journal-min-files>
<journal-pool-files>-1</journal-pool-files>
<!--
This value was determined through a calculation.
Your system could perform 25 writes per millisecond
on the current journal configuration.
That translates as a sync write every 40000 nanoseconds
-->
<journal-buffer-timeout>40000</journal-buffer-timeout>
<acceptors>
<!-- Default ActiveMQ Artemis Acceptor. Multi-protocol adapter. Currently supports ActiveMQ Artemis Core, OpenWire, STOMP, AMQP, MQTT, and HornetQ Core. -->
<!-- performance tests have shown that openWire performs best with these buffer sizes -->
<acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576</acceptor>
<!-- AMQP Acceptor. Listens on default AMQP port for AMQP traffic.-->
<acceptor name="amqp">tcp://0.0.0.0:5672?protocols=AMQP</acceptor>
<!-- STOMP Acceptor. -->
<acceptor name="stomp">tcp://0.0.0.0:61613?protocols=STOMP</acceptor>
<!-- HornetQ Compatibility Acceptor. Enables HornetQ Core and STOMP for legacy HornetQ clients. -->
<acceptor name="hornetq">tcp://0.0.0.0:5445?protocols=HORNETQ,STOMP</acceptor>
<!-- MQTT Acceptor -->
<acceptor name="mqtt">tcp://0.0.0.0:1883?protocols=MQTT</acceptor>
</acceptors>
<security-settings>
<security-setting match="#">
<permission type="createNonDurableQueue" roles="a"/>
<permission type="deleteNonDurableQueue" roles="a"/>
<permission type="createDurableQueue" roles="a"/>
<permission type="deleteDurableQueue" roles="a"/>
<permission type="browse" roles="a"/>
<permission type="send" roles="a"/>
<!-- we need this otherwise ./artemis data imp wouldn't work -->
<permission type="manage" roles="a"/>
</security-setting>
</security-settings>
<address-settings>
<!--default for catch all-->
<address-setting match="#">
<redelivery-delay>0</redelivery-delay>
<max-size-bytes>10Mb</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>BLOCK</address-full-policy>
</address-setting>
<address-setting match="config_#">
<auto-create-queues>false</auto-create-queues>
<dead-letter-address>DLQ</dead-letter-address>
<expiry-address>ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<max-size-bytes>10Mb</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>BLOCK</address-full-policy>
<config-delete-queues>FORCE</config-delete-queues>
<config-delete-addresses>FORCE</config-delete-addresses>
</address-setting>
</address-settings>
<addresses>
<address name="config_test_queue_removal">
<multicast>
<queue name="config_test_queue_removal_queue_1"/>
</multicast>
</address>
<address name="permanent_test_queue_removal">
<multicast>
<queue name="permanent_test_queue_removal_queue_1"/>
</multicast>
</address>
</addresses>
</core>
</configuration>

View File

@ -0,0 +1,143 @@
<?xml version='1.0'?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<configuration xmlns="urn:activemq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
<core xmlns="urn:activemq:core">
<name>0.0.0.0</name>
<configuration-file-refresh-period>100</configuration-file-refresh-period>
<persistence-enabled>false</persistence-enabled>
<security-enabled>false</security-enabled>
<!-- this could be ASYNCIO or NIO
-->
<journal-type>NIO</journal-type>
<paging-directory>./data/paging</paging-directory>
<bindings-directory>./data/bindings</bindings-directory>
<journal-directory>./data/journal</journal-directory>
<large-messages-directory>./data/large-messages</large-messages-directory>
<journal-min-files>2</journal-min-files>
<journal-pool-files>-1</journal-pool-files>
<!--
This value was determined through a calculation.
Your system could perform 25 writes per millisecond
on the current journal configuration.
That translates as a sync write every 40000 nanoseconds
-->
<journal-buffer-timeout>40000</journal-buffer-timeout>
<acceptors>
<!-- Default ActiveMQ Artemis Acceptor. Multi-protocol adapter. Currently supports ActiveMQ Artemis Core, OpenWire, STOMP, AMQP, MQTT, and HornetQ Core. -->
<!-- performance tests have shown that openWire performs best with these buffer sizes -->
<acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576</acceptor>
<!-- AMQP Acceptor. Listens on default AMQP port for AMQP traffic.-->
<acceptor name="amqp">tcp://0.0.0.0:5672?protocols=AMQP</acceptor>
<!-- STOMP Acceptor. -->
<acceptor name="stomp">tcp://0.0.0.0:61613?protocols=STOMP</acceptor>
<!-- HornetQ Compatibility Acceptor. Enables HornetQ Core and STOMP for legacy HornetQ clients. -->
<acceptor name="hornetq">tcp://0.0.0.0:5445?protocols=HORNETQ,STOMP</acceptor>
<!-- MQTT Acceptor -->
<acceptor name="mqtt">tcp://0.0.0.0:1883?protocols=MQTT</acceptor>
</acceptors>
<security-settings>
<security-setting match="#">
<permission type="createNonDurableQueue" roles="a"/>
<permission type="deleteNonDurableQueue" roles="a"/>
<permission type="createDurableQueue" roles="a"/>
<permission type="deleteDurableQueue" roles="a"/>
<permission type="browse" roles="a"/>
<permission type="send" roles="a"/>
<!-- we need this otherwise ./artemis data imp wouldn't work -->
<permission type="manage" roles="a"/>
</security-setting>
</security-settings>
<address-settings>
<!--default for catch all-->
<address-setting match="#">
<auto-create-queues>false</auto-create-queues>
<dead-letter-address>DLQ</dead-letter-address>
<expiry-address>ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<max-size-bytes>10Mb</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>BLOCK</address-full-policy>
</address-setting>
<address-setting match="config_#">
<auto-create-queues>false</auto-create-queues>
<dead-letter-address>DLQ</dead-letter-address>
<expiry-address>ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<max-size-bytes>10Mb</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>BLOCK</address-full-policy>
<config-delete-queues>FORCE</config-delete-queues>
<config-delete-addresses>FORCE</config-delete-addresses>
</address-setting>
</address-settings>
<addresses>
<address name="config_test_queue_removal">
<multicast>
<queue name="config_test_queue_removal_queue_1"/>
<queue name="config_test_queue_removal_queue_2"/>
</multicast>
</address>
<address name="config_test_address_removal">
<multicast>
<queue name="config_test_address_removal_queue"/>
</multicast>
</address>
<address name="permanent_test_queue_removal">
<multicast>
<queue name="permanent_test_queue_removal_queue_1"/>
<queue name="permanent_test_queue_removal_queue_2"/>
</multicast>
</address>
<address name="permanent_test_address_removal">
<multicast>
<queue name="permanent_test_address_removal_queue"/>
</multicast>
</address>
</addresses>
</core>
</configuration>