ARTEMIS-3202 - add a flag to deleted diverts removed from config

https://issues.apache.org/jira/browse/ARTEMIS-3202
This commit is contained in:
Andy Taylor 2021-03-24 08:41:21 +00:00 committed by clebertsuconic
parent 69bea6756c
commit 658d45f543
8 changed files with 151 additions and 11 deletions

View File

@ -271,6 +271,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
private static final String CONFIG_DELETE_ADDRESSES = "config-delete-addresses";
private static final String CONFIG_DELETE_DIVERTS = "config-delete-diverts";
private static final String DEFAULT_PURGE_ON_NO_CONSUMERS = "default-purge-on-no-consumers";
private static final String DEFAULT_MAX_CONSUMERS = "default-max-consumers";
@ -1250,6 +1252,11 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
Validators.DELETION_POLICY_TYPE.validate(CONFIG_DELETE_ADDRESSES, value);
DeletionPolicy policy = Enum.valueOf(DeletionPolicy.class, value);
addressSettings.setConfigDeleteAddresses(policy);
} else if (CONFIG_DELETE_DIVERTS.equalsIgnoreCase(name)) {
String value = getTrimmedTextContent(child);
Validators.DELETION_POLICY_TYPE.validate(CONFIG_DELETE_DIVERTS, value);
DeletionPolicy policy = Enum.valueOf(DeletionPolicy.class, value);
addressSettings.setConfigDeleteDiverts(policy);
} else if (MANAGEMENT_BROWSE_PAGE_SIZE.equalsIgnoreCase(name)) {
addressSettings.setManagementBrowsePageSize(XMLUtil.parseInt(child));
} else if (MANAGEMENT_MESSAGE_ATTRIBUTE_SIZE_LIMIT.equalsIgnoreCase(name)) {

View File

@ -4056,20 +4056,21 @@ public class ActiveMQServerImpl implements ActiveMQServer {
private void deployDiverts() throws Exception {
if (storageManager.recoverDivertConfigurations() != null) {
for (PersistedDivertConfiguration persistedDivertConfiguration : storageManager.recoverDivertConfigurations()) {
boolean deleted = true;
for (DivertConfiguration config : configuration.getDivertConfigurations()) {
if (persistedDivertConfiguration.getName().equals(config.getName())) {
deleted = false;
}
}
for (PersistedDivertConfiguration persistedDivertConfiguration : storageManager.recoverDivertConfigurations()) {
//has it been removed from config
boolean deleted = configuration.getDivertConfigurations().stream().noneMatch(divertConfiguration -> divertConfiguration.getName().equals(persistedDivertConfiguration.getName()));
// if it has remove it if configured to do so
if (deleted) {
//todo add a flag to specify whether to delete or not
deployDivert(persistedDivertConfiguration.getDivertConfiguration());
if (addressSettingsRepository.getMatch(persistedDivertConfiguration.getDivertConfiguration().getAddress()).getConfigDeleteDiverts() == DeletionPolicy.FORCE) {
storageManager.deleteDivertConfiguration(persistedDivertConfiguration.getName());
} else {
deployDivert(persistedDivertConfiguration.getDivertConfiguration());
}
}
}
}
//deploy the configured diverts
for (DivertConfiguration config : configuration.getDivertConfigurations()) {
deployDivert(config);
}

View File

@ -90,6 +90,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
public static final DeletionPolicy DEFAULT_CONFIG_DELETE_ADDRESSES = DeletionPolicy.OFF;
public static final DeletionPolicy DEFAULT_CONFIG_DELETE_DIVERTS = DeletionPolicy.OFF;
public static final long DEFAULT_REDISTRIBUTION_DELAY = -1;
public static final boolean DEFAULT_AUTO_CREATE_EXPIRY_RESOURCES = false;
@ -223,6 +225,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
private DeletionPolicy configDeleteAddresses = null;
private DeletionPolicy configDeleteDiverts = null;
private Integer managementBrowsePageSize = AddressSettings.MANAGEMENT_BROWSE_PAGE_SIZE;
private Long maxSizeBytesRejectThreshold = null;
@ -464,6 +468,15 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
return this;
}
public AddressSettings setConfigDeleteDiverts(DeletionPolicy configDeleteDiverts) {
this.configDeleteDiverts = configDeleteDiverts;
return this;
}
public DeletionPolicy getConfigDeleteDiverts() {
return configDeleteDiverts != null ? configDeleteDiverts : AddressSettings.DEFAULT_CONFIG_DELETE_DIVERTS;
}
public int getDefaultMaxConsumers() {
return defaultMaxConsumers != null ? defaultMaxConsumers : ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers();
}
@ -1581,6 +1594,7 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
result = prime * result + ((autoDeleteAddresses == null) ? 0 : autoDeleteAddresses.hashCode());
result = prime * result + ((autoDeleteAddressesDelay == null) ? 0 : autoDeleteAddressesDelay.hashCode());
result = prime * result + ((configDeleteAddresses == null) ? 0 : configDeleteAddresses.hashCode());
result = prime * result + ((configDeleteDiverts == null) ? 0 : configDeleteDiverts.hashCode());
result = prime * result + ((managementBrowsePageSize == null) ? 0 : managementBrowsePageSize.hashCode());
result = prime * result + ((queuePrefetch == null) ? 0 : queuePrefetch.hashCode());
result = prime * result + ((maxSizeBytesRejectThreshold == null) ? 0 : maxSizeBytesRejectThreshold.hashCode());
@ -1815,6 +1829,11 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
return false;
} else if (!configDeleteAddresses.equals(other.configDeleteAddresses))
return false;
if (configDeleteDiverts == null) {
if (other.configDeleteDiverts != null)
return false;
} else if (!configDeleteDiverts.equals(other.configDeleteDiverts))
return false;
if (managementBrowsePageSize == null) {
if (other.managementBrowsePageSize != null)
return false;
@ -2043,7 +2062,9 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
", autoDeleteAddressesDelay=" +
autoDeleteAddressesDelay +
", configDeleteAddresses=" +
configDeleteAddresses +
configDeleteAddresses +
", configDeleteDiverts=" +
configDeleteDiverts +
", managementBrowsePageSize=" +
managementBrowsePageSize +
", managementMessageAttributeSizeLimit=" +

View File

@ -3801,6 +3801,22 @@
</xsd:simpleType>
</xsd:element>
<xsd:element name="config-delete-diverts" default="OFF" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
What to do when a divert is no longer in broker.xml.
OFF = will do nothing queues will remain,
FORCE = delete queues even if messages remaining.
</xsd:documentation>
</xsd:annotation>
<xsd:simpleType>
<xsd:restriction base="xsd:string">
<xsd:enumeration value="OFF"/>
<xsd:enumeration value="FORCE"/>
</xsd:restriction>
</xsd:simpleType>
</xsd:element>
<xsd:element name="management-browse-page-size" type="xsd:int" default="200" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>

View File

@ -672,6 +672,7 @@ that would be found in the `broker.xml` file.
<auto-delete-queues-delay>0</auto-delete-queues-delay>
<auto-delete-queues-message-count>0</auto-delete-queues-message-count>
<config-delete-queues>OFF</config-delete-queues>
<config-delete-diverts>OFF</config-delete-diverts>
<auto-create-addresses>true</auto-create-addresses>
<auto-delete-addresses>true</auto-delete-addresses>
<auto-delete-addresses-delay>0</auto-delete-addresses-delay>
@ -900,6 +901,9 @@ For Core JMS you can set it using the destination queue attributes
reload, by delete policy: `OFF` or `FORCE`. Default is `OFF`. Read more about
[configuration reload](config-reload.md).
`config-delete-diverts`. How the broker should handle diverts deleted on config
reload, by delete policy: `OFF` or `FORCE`. Default is `OFF`. Read more about
[configuration reload](config-reload.md).
`auto-create-addresses`. Whether or not the broker should automatically create
an address when a message is sent to or a consumer tries to consume from a
queue which is mapped to an address whose name fits the address `match`.

View File

@ -16,7 +16,7 @@ If using [modulised broker.xml](configuration-index.md#modularising-broker.xml)
**Note:**
Deletion of Address's and Queue's, not auto created is controlled by Address Settings
Deletion of Address's Queue's and diverts not auto created is controlled by Address Settings
* config-delete-addresses
* OFF (DEFAULT) - will not remove upon config reload.
@ -26,6 +26,9 @@ Deletion of Address's and Queue's, not auto created is controlled by Address Set
* OFF (DEFAULT) - will not remove upon config reload.
* FORCE - will remove the queue upon config reload, even if messages remains, losing the messages in the queue.
* config-delete-diverts
* OFF (DEFAULT) - will not remove upon config reload.
* FORCE - will remove the queue upon config reload, even if messages remains, losing the messages in the queue.
By default both settings are OFF as such address & queues won't be removed upon
reload, given the risk of losing messages.

View File

@ -249,6 +249,7 @@ Name | Description | Default
[auto-delete-addresses](address-model.md#configuring-addresses-and-queues-via-address-settings) | Delete auto-created addresses automatically | `true`
[auto-delete-addresses-delay](address-model.md#configuring-addresses-and-queues-via-address-settings) | Delay for deleting auto-created addresses | 0
[config-delete-addresses](config-reload.md) | How to deal with addresses deleted from XML at runtime | `OFF`
[config-delete-diverts](config-reload.md) | How to deal with diverts deleted from XML at runtime | `OFF`
[management-browse-page-size]() | Number of messages a management resource can browse| 200
[default-purge-on-no-consumers](address-model.md#non-durable-subscription-queue) | `purge-on-no-consumers` value if none is set on the queue | `false`
[default-max-consumers](address-model.md#shared-durable-subscription-queue-using-max-consumers) | `max-consumers` value if none is set on the queue | -1

View File

@ -54,6 +54,7 @@ import org.apache.activemq.artemis.core.server.transformer.Transformer;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.core.server.impl.ServiceRegistryImpl;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.settings.impl.DeletionPolicy;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.command.ActiveMQTopic;
@ -835,6 +836,92 @@ public class DivertTest extends ActiveMQTestBase {
Assert.assertEquals(divert1.getRoutingName(), new SimpleString("divert1"));
}
@Test
public void testSinglePersistedDeleteDivert() throws Exception {
final String testAddress = "testAddress";
final String forwardAddress = "forwardAddress";
DivertConfiguration divertConf = new DivertConfiguration().setName("divert1").setRoutingName("divert1").setAddress(testAddress).setForwardingAddress(forwardAddress).setExclusive(true);
QueueConfiguration q1 = new QueueConfiguration("forwardAddress1").setDurable(true).setRoutingType(RoutingType.ANYCAST);
AddressSettings addressSettings = new AddressSettings();
addressSettings.setConfigDeleteDiverts(DeletionPolicy.FORCE);
Configuration config = createDefaultInVMConfig().addDivertConfiguration(divertConf).addQueueConfiguration(q1);
ActiveMQServer server = addServer(ActiveMQServers.newActiveMQServer(config, true));
server.getAddressSettingsRepository().addMatch(testAddress, addressSettings);
server.start();
server.stop();
config.getDivertConfigurations().clear();
server.start();
Binding divert1 = server.getPostOffice().getBinding(new SimpleString("divert1"));
Assert.assertNull(divert1);
}
@Test
public void testMixedPersistedDeleteDivert() throws Exception {
final String testAddress = "testAddress";
final String testAddress2 = "testAddress2";
final String forwardAddress = "forwardAddress";
final String forwardAddress2 = "forwardAddress2";
DivertConfiguration divertConf = new DivertConfiguration().setName("divert1").setRoutingName("divert1").setAddress(testAddress).setForwardingAddress(forwardAddress).setExclusive(true);
DivertConfiguration divertConf2 = new DivertConfiguration().setName("divert2").setRoutingName("divert2").setAddress(testAddress2).setForwardingAddress(forwardAddress2).setExclusive(true);
QueueConfiguration q1 = new QueueConfiguration("forwardAddress1").setDurable(true).setRoutingType(RoutingType.ANYCAST);
AddressSettings addressSettings = new AddressSettings();
addressSettings.setConfigDeleteDiverts(DeletionPolicy.FORCE);
AddressSettings addressSettings2 = new AddressSettings();
addressSettings2.setConfigDeleteDiverts(DeletionPolicy.OFF);
Configuration config = createDefaultInVMConfig().addDivertConfiguration(divertConf).addDivertConfiguration(divertConf2).addQueueConfiguration(q1);
ActiveMQServer server = addServer(ActiveMQServers.newActiveMQServer(config, true));
server.getAddressSettingsRepository().addMatch(testAddress, addressSettings);
server.getAddressSettingsRepository().addMatch(testAddress2, addressSettings2);
server.start();
server.stop();
config.getDivertConfigurations().clear();
server.start();
Binding divert1 = server.getPostOffice().getBinding(new SimpleString("divert1"));
Assert.assertNull(divert1);
Binding divert2 = server.getPostOffice().getBinding(new SimpleString("divert2"));
Assert.assertNotNull(divert2);
}
@Test
public void testMultipleNonExclusiveDivert() throws Exception {