ARTEMIS-2901 support namespace for temporary queues

This commit is contained in:
Justin Bertram 2020-09-12 22:14:49 -05:00 committed by Clebert Suconic
parent ba902a9816
commit 6be8966164
14 changed files with 143 additions and 2 deletions

View File

@ -619,6 +619,8 @@ public final class ActiveMQDefaultConfiguration {
// Whether or not to report JVM thread metrics // Whether or not to report JVM thread metrics
private static final boolean DEFAULT_JVM_THREAD_METRICS = false; private static final boolean DEFAULT_JVM_THREAD_METRICS = false;
public static final String DEFAULT_TEMPORARY_QUEUE_NAMESPACE = "";
/** /**
* If true then the ActiveMQ Artemis Server will make use of any Protocol Managers that are in available on the classpath. If false then only the core protocol will be available, unless in Embedded mode where users can inject their own Protocol Managers. * If true then the ActiveMQ Artemis Server will make use of any Protocol Managers that are in available on the classpath. If false then only the core protocol will be available, unless in Embedded mode where users can inject their own Protocol Managers.
*/ */
@ -1692,4 +1694,7 @@ public final class ActiveMQDefaultConfiguration {
return DEFAULT_JVM_THREAD_METRICS; return DEFAULT_JVM_THREAD_METRICS;
} }
public static String getDefaultTemporaryQueueNamespace() {
return DEFAULT_TEMPORARY_QUEUE_NAMESPACE;
}
} }

View File

@ -1330,4 +1330,8 @@ public interface Configuration {
* @return * @return
*/ */
List<ActiveMQServerResourcePlugin> getBrokerResourcePlugins(); List<ActiveMQServerResourcePlugin> getBrokerResourcePlugins();
String getTemporaryQueueNamespace();
Configuration setTemporaryQueueNamespace(String temporaryQueueNamespace);
} }

View File

@ -352,6 +352,8 @@ public class ConfigurationImpl implements Configuration, Serializable {
private int pageSyncTimeout = ActiveMQDefaultConfiguration.getDefaultJournalBufferTimeoutNio(); private int pageSyncTimeout = ActiveMQDefaultConfiguration.getDefaultJournalBufferTimeoutNio();
private String temporaryQueueNamespace = ActiveMQDefaultConfiguration.getDefaultTemporaryQueueNamespace();
/** /**
* Parent folder for all data folders. * Parent folder for all data folders.
*/ */
@ -2495,4 +2497,15 @@ public class ConfigurationImpl implements Configuration, Serializable {
} }
} }
@Override
public String getTemporaryQueueNamespace() {
return temporaryQueueNamespace;
}
@Override
public ConfigurationImpl setTemporaryQueueNamespace(final String temporaryQueueNamespace) {
this.temporaryQueueNamespace = temporaryQueueNamespace;
return this;
}
} }

View File

@ -408,6 +408,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
config.setConfigurationFileRefreshPeriod(getLong(e, "configuration-file-refresh-period", config.getConfigurationFileRefreshPeriod(), Validators.GT_ZERO)); config.setConfigurationFileRefreshPeriod(getLong(e, "configuration-file-refresh-period", config.getConfigurationFileRefreshPeriod(), Validators.GT_ZERO));
config.setTemporaryQueueNamespace(getString(e, "temporary-queue-namespace", config.getTemporaryQueueNamespace(), Validators.NOT_NULL_OR_EMPTY));
long globalMaxSize = getTextBytesAsLongBytes(e, GLOBAL_MAX_SIZE, -1, Validators.MINUS_ONE_OR_GT_ZERO); long globalMaxSize = getTextBytesAsLongBytes(e, GLOBAL_MAX_SIZE, -1, Validators.MINUS_ONE_OR_GT_ZERO);
if (globalMaxSize > 0) { if (globalMaxSize > 0) {

View File

@ -3489,7 +3489,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
} }
} }
QueueConfigurationUtils.applyDynamicQueueDefaults(queueConfiguration, addressSettingsRepository.getMatch(queueConfiguration.getAddress().toString())); QueueConfigurationUtils.applyDynamicQueueDefaults(queueConfiguration, addressSettingsRepository.getMatch(getRuntimeTempQueueNamespace(queueConfiguration.isTemporary()) + queueConfiguration.getAddress().toString()));
AddressInfo info = postOffice.getAddressInfo(queueConfiguration.getAddress()); AddressInfo info = postOffice.getAddressInfo(queueConfiguration.getAddress());
if (queueConfiguration.isAutoCreateAddress() || queueConfiguration.isTemporary()) { if (queueConfiguration.isAutoCreateAddress() || queueConfiguration.isTemporary()) {
@ -3572,6 +3572,14 @@ public class ActiveMQServerImpl implements ActiveMQServer {
return queue; return queue;
} }
public String getRuntimeTempQueueNamespace(boolean temporary) {
StringBuilder runtimeTempQueueNamespace = new StringBuilder();
if (temporary && configuration.getTemporaryQueueNamespace() != null && configuration.getTemporaryQueueNamespace().length() > 0) {
runtimeTempQueueNamespace.append(configuration.getTemporaryQueueNamespace()).append(configuration.getWildcardConfiguration().getDelimiterString());
}
return runtimeTempQueueNamespace.toString();
}
private void copyRetroactiveMessages(Queue queue) throws Exception { private void copyRetroactiveMessages(Queue queue) throws Exception {
if (addressSettingsRepository.getMatch(queue.getAddress().toString()).getRetroactiveMessageCount() > 0) { if (addressSettingsRepository.getMatch(queue.getAddress().toString()).getRetroactiveMessageCount() > 0) {
Queue retroQueue = locateQueue(ResourceNames.getRetroactiveResourceQueueName(getInternalNamingPrefix(), getConfiguration().getWildcardConfiguration().getDelimiterString(), queue.getAddress(), queue.getRoutingType())); Queue retroQueue = locateQueue(ResourceNames.getRetroactiveResourceQueueName(getInternalNamingPrefix(), getConfiguration().getWildcardConfiguration().getDelimiterString(), queue.getAddress(), queue.getRoutingType()));

View File

@ -4373,7 +4373,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
@Override @Override
public void onChange() { public void onChange() {
AddressSettings settings = addressSettingsRepository.getMatch(address.toString()); AddressSettings settings = addressSettingsRepository.getMatch(((ActiveMQServerImpl)server).getRuntimeTempQueueNamespace(temporary) + address.toString());
configureExpiry(settings); configureExpiry(settings);
checkDeadLetterAddressAndExpiryAddress(settings); checkDeadLetterAddressAndExpiryAddress(settings);
configureSlowConsumerReaper(settings); configureSlowConsumerReaper(settings);

View File

@ -310,6 +310,14 @@
</xsd:annotation> </xsd:annotation>
</xsd:element> </xsd:element>
<xsd:element name="temporary-queue-namespace" type="xsd:string" default="" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
the namespace to use for looking up address settings for temporary queues
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="async-connection-execution-enabled" type="xsd:boolean" default="true" maxOccurs="1" <xsd:element name="async-connection-execution-enabled" type="xsd:boolean" default="true" maxOccurs="1"
minOccurs="0"> minOccurs="0">
<xsd:annotation> <xsd:annotation>

View File

@ -92,6 +92,7 @@ public class ConfigurationImplTest extends ActiveMQTestBase {
Assert.assertNull(conf.getJournalDeviceBlockSize()); Assert.assertNull(conf.getJournalDeviceBlockSize());
Assert.assertEquals(ActiveMQDefaultConfiguration.isDefaultReadWholePage(), conf.isReadWholePage()); Assert.assertEquals(ActiveMQDefaultConfiguration.isDefaultReadWholePage(), conf.isReadWholePage());
Assert.assertEquals(ActiveMQDefaultConfiguration.getDefaultJournalBufferTimeoutNio(), conf.getPageSyncTimeout()); Assert.assertEquals(ActiveMQDefaultConfiguration.getDefaultJournalBufferTimeoutNio(), conf.getPageSyncTimeout());
Assert.assertEquals(ActiveMQDefaultConfiguration.getDefaultTemporaryQueueNamespace(), conf.getTemporaryQueueNamespace());
} }
@Test @Test

View File

@ -150,6 +150,7 @@ public class FileConfigurationTest extends ConfigurationImplTest {
Assert.assertEquals(false, conf.isRejectEmptyValidatedUser()); Assert.assertEquals(false, conf.isRejectEmptyValidatedUser());
Assert.assertEquals(98765, conf.getConnectionTtlCheckInterval()); Assert.assertEquals(98765, conf.getConnectionTtlCheckInterval());
Assert.assertEquals(1234567, conf.getConfigurationFileRefreshPeriod()); Assert.assertEquals(1234567, conf.getConfigurationFileRefreshPeriod());
Assert.assertEquals("TEMP", conf.getTemporaryQueueNamespace());
Assert.assertEquals("127.0.0.1", conf.getNetworkCheckList()); Assert.assertEquals("127.0.0.1", conf.getNetworkCheckList());
Assert.assertEquals("some-nick", conf.getNetworkCheckNIC()); Assert.assertEquals("some-nick", conf.getNetworkCheckNIC());

View File

@ -58,6 +58,7 @@
<reject-empty-validated-user>false</reject-empty-validated-user> <reject-empty-validated-user>false</reject-empty-validated-user>
<connection-ttl-check-interval>98765</connection-ttl-check-interval> <connection-ttl-check-interval>98765</connection-ttl-check-interval>
<configuration-file-refresh-period>1234567</configuration-file-refresh-period> <configuration-file-refresh-period>1234567</configuration-file-refresh-period>
<temporary-queue-namespace>TEMP</temporary-queue-namespace>
<global-max-size>1234567</global-max-size> <global-max-size>1234567</global-max-size>
<max-disk-usage>37</max-disk-usage> <max-disk-usage>37</max-disk-usage>
<disk-scan-period>123</disk-scan-period> <disk-scan-period>123</disk-scan-period>

View File

@ -59,6 +59,7 @@
<reject-empty-validated-user>false</reject-empty-validated-user> <reject-empty-validated-user>false</reject-empty-validated-user>
<connection-ttl-check-interval>98765</connection-ttl-check-interval> <connection-ttl-check-interval>98765</connection-ttl-check-interval>
<configuration-file-refresh-period>1234567</configuration-file-refresh-period> <configuration-file-refresh-period>1234567</configuration-file-refresh-period>
<temporary-queue-namespace>TEMP</temporary-queue-namespace>
<global-max-size>1234567</global-max-size> <global-max-size>1234567</global-max-size>
<max-disk-usage>37</max-disk-usage> <max-disk-usage>37</max-disk-usage>
<disk-scan-period>123</disk-scan-period> <disk-scan-period>123</disk-scan-period>

View File

@ -294,6 +294,14 @@
</xsd:annotation> </xsd:annotation>
</xsd:element> </xsd:element>
<xsd:element name="temporary-queue-namespace" type="xsd:string" default="" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
the namespace to use for looking up address settings for temporary queues
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="async-connection-execution-enabled" type="xsd:boolean" default="true" maxOccurs="1" <xsd:element name="async-connection-execution-enabled" type="xsd:boolean" default="true" maxOccurs="1"
minOccurs="0"> minOccurs="0">
<xsd:annotation> <xsd:annotation>

View File

@ -554,6 +554,41 @@ Open the file `<broker-instance>/etc/broker.xml` for editing.
Warning: Disabling all the queues on an address means that any message sent to that address will be silently dropped. Warning: Disabling all the queues on an address means that any message sent to that address will be silently dropped.
### Temporary Queues
For some protocols and APIs which only support monolithic "destinations"
without the address/queue separation (e.g. AMQP, JMS, etc.) temporary queues
are created by the broker using a UUID (i.e universally unique identifier) as
the name for both the address and the queue. Because the name is a UUID it is
impossible to create an `address-setting` for it whose `match` is anything but
`#`.
To solve this problem one can specify the `temporary-queue-namespace` in
`broker.xml` and then create an `address-setting` whose `match` value
corresponds to the configured `temporary-queue-namespace`. When the
`temporary-queue-namespace` is set and a temporary queue is created then the
broker will prepend the `temporary-queue-namespace` value along with the
`delimiter` value configured in `wildcard-addresses` (defaults to `.`) to the
address name and use that to lookup the associated `address-setting` values.
Here's a simple example configuration:
```xml
<temporary-queue-namespace>temp</temporary-queue-namespace>
<address-settings>
<address-setting match="temp.#">
<enable-metrics>false</enable-metrics>
</address-setting>
</address-settings>
```
Using this configuration any temporary queue will have metrics disabled.
> **Note:**
>
> This setting does *not* change the actual name of the temporary queue. It
> only changes the name used to *lookup* the address-settings.
## Protocol Managers ## Protocol Managers

View File

@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.server;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.util.SingleServerTestBase;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.junit.Test;
public class TempQueueNamespaceTest extends SingleServerTestBase {
@Test
public void testTempQueueNamespace() throws Exception {
final String TEMP_QUEUE_NAMESPACE = "temp";
server.getConfiguration().setTemporaryQueueNamespace(TEMP_QUEUE_NAMESPACE);
server.getAddressSettingsRepository().addMatch(TEMP_QUEUE_NAMESPACE + ".#", new AddressSettings().setDefaultRingSize(10));
SimpleString queue = RandomUtil.randomSimpleString();
SimpleString address = RandomUtil.randomSimpleString();
session.createQueue(new QueueConfiguration(queue).setAddress(address).setDurable(false).setTemporary(true));
assertEquals(10, (long) server.locateQueue(queue).getQueueConfiguration().getRingSize());
session.close();
}
@Test
public void testTempQueueNamespaceNegative() throws Exception {
SimpleString queue = RandomUtil.randomSimpleString();
SimpleString address = RandomUtil.randomSimpleString();
session.createQueue(new QueueConfiguration(queue).setAddress(address).setDurable(false).setTemporary(true));
assertNotEquals(10, (long) server.locateQueue(queue).getQueueConfiguration().getRingSize());
session.close();
}
}