ARTEMIS-2888 ARTEMIS-2859 ARTEMIS-2768 - new page-store-name addressSetting to allow wildcard subscriptions share a single page store

This commit is contained in:
gtully 2020-09-16 17:56:12 +01:00
parent 312b932102
commit fa04881c6f
21 changed files with 932 additions and 12 deletions

View File

@ -109,6 +109,7 @@ public class QueueConfiguration implements Serializable {
private Boolean internal; private Boolean internal;
private Boolean _transient; private Boolean _transient;
private Boolean autoCreated; private Boolean autoCreated;
private transient SimpleString pageStoreName;
/** /**
* Instantiate this object and invoke {@link #setName(SimpleString)} * Instantiate this object and invoke {@link #setName(SimpleString)}
@ -877,4 +878,12 @@ public class QueueConfiguration implements Serializable {
+ ", transient=" + _transient + ", transient=" + _transient
+ ", autoCreated=" + autoCreated + ']'; + ", autoCreated=" + autoCreated + ']';
} }
public void setPageStoreName(SimpleString pageStoreName) {
this.pageStoreName = pageStoreName;
}
public SimpleString getPageStoreName() {
return pageStoreName != null ? pageStoreName : getAddress();
}
} }

View File

@ -296,6 +296,7 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
private static final String ENABLE_METRICS = "enable-metrics"; private static final String ENABLE_METRICS = "enable-metrics";
private static final String PAGE_STORE_NAME = "page-store-name";
// Attributes ---------------------------------------------------- // Attributes ----------------------------------------------------
@ -1266,6 +1267,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
addressSettings.setExpiryQueueSuffix(new SimpleString(getTrimmedTextContent(child))); addressSettings.setExpiryQueueSuffix(new SimpleString(getTrimmedTextContent(child)));
} else if (ENABLE_METRICS.equalsIgnoreCase(name)) { } else if (ENABLE_METRICS.equalsIgnoreCase(name)) {
addressSettings.setEnableMetrics(XMLUtil.parseBoolean(child)); addressSettings.setEnableMetrics(XMLUtil.parseBoolean(child));
} else if (PAGE_STORE_NAME.equalsIgnoreCase(name)) {
addressSettings.setPageStoreName(new SimpleString(getTrimmedTextContent(child)));
} }
} }
return setting; return setting;

View File

@ -336,6 +336,14 @@ public final class PagingManagerImpl implements PagingManager {
@Override @Override
public void deletePageStore(final SimpleString storeName) throws Exception { public void deletePageStore(final SimpleString storeName) throws Exception {
final AddressSettings addressSettings = addressSettingsRepository.getMatch(storeName.toString());
if (addressSettings != null && addressSettings.getPageStoreName() != null) {
if (logger.isTraceEnabled()) {
logger.tracev("not deleting potentially shared pageAddress {} match for {}", addressSettings.getPageStoreName(), storeName);
}
return;
}
syncLock.readLock().lock(); syncLock.readLock().lock();
try { try {
PagingStore store = stores.remove(storeName); PagingStore store = stores.remove(storeName);
@ -352,11 +360,16 @@ public final class PagingManagerImpl implements PagingManager {
* This method creates a new store if not exist. * This method creates a new store if not exist.
*/ */
@Override @Override
public PagingStore getPageStore(final SimpleString storeName) throws Exception { public PagingStore getPageStore(SimpleString storeName) throws Exception {
if (managementAddress != null && storeName.startsWith(managementAddress)) { if (managementAddress != null && storeName.startsWith(managementAddress)) {
return null; return null;
} }
final AddressSettings addressSettings = addressSettingsRepository.getMatch(storeName.toString());
if (addressSettings != null && addressSettings.getPageStoreName() != null) {
storeName = addressSettings.getPageStoreName();
}
PagingStore store = stores.get(storeName); PagingStore store = stores.get(storeName);
if (store != null) { if (store != null) {
return store; return store;

View File

@ -844,8 +844,6 @@ public class PagingStoreImpl implements PagingStore {
return false; return false;
} }
message.setAddress(address);
final long transactionID = tx == null ? -1 : tx.getID(); final long transactionID = tx == null ? -1 : tx.getID();
PagedMessage pagedMessage = new PagedMessageImpl(message, routeQueues(tx, listCtx), transactionID); PagedMessage pagedMessage = new PagedMessageImpl(message, routeQueues(tx, listCtx), transactionID);

View File

@ -46,5 +46,6 @@ public class QueueConfigurationUtils {
config.setAutoDeleteMessageCount(config.getAutoDeleteMessageCount() == null ? as.getAutoDeleteQueuesMessageCount() : config.getAutoDeleteMessageCount()); config.setAutoDeleteMessageCount(config.getAutoDeleteMessageCount() == null ? as.getAutoDeleteQueuesMessageCount() : config.getAutoDeleteMessageCount());
config.setEnabled(config.isEnabled() == null ? ActiveMQDefaultConfiguration.getDefaultEnabled() : config.isEnabled()); config.setEnabled(config.isEnabled() == null ? ActiveMQDefaultConfiguration.getDefaultEnabled() : config.isEnabled());
config.setPageStoreName(as.getPageStoreName());
} }
} }

View File

@ -140,7 +140,7 @@ public class QueueFactoryImpl implements QueueFactory {
PageSubscription pageSubscription; PageSubscription pageSubscription;
try { try {
PagingStore pageStore = pagingManager.getPageStore(queueConfiguration.getAddress()); PagingStore pageStore = pagingManager.getPageStore(queueConfiguration.getPageStoreName());
if (pageStore != null) { if (pageStore != null) {
pageSubscription = pageStore.getCursorProvider().createSubscription(queueConfiguration.getId(), FilterImpl.createFilter(queueConfiguration.getFilterString()), queueConfiguration.isDurable()); pageSubscription = pageStore.getCursorProvider().createSubscription(queueConfiguration.getId(), FilterImpl.createFilter(queueConfiguration.getFilterString()), queueConfiguration.isDurable());
} else { } else {

View File

@ -253,6 +253,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
private Boolean enableMetrics = null; private Boolean enableMetrics = null;
private SimpleString pageStoreName = null;
//from amq5 //from amq5
//make it transient //make it transient
private transient Integer queuePrefetch = null; private transient Integer queuePrefetch = null;
@ -318,6 +320,7 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
this.defaultGroupFirstKey = other.defaultGroupFirstKey; this.defaultGroupFirstKey = other.defaultGroupFirstKey;
this.defaultRingSize = other.defaultRingSize; this.defaultRingSize = other.defaultRingSize;
this.enableMetrics = other.enableMetrics; this.enableMetrics = other.enableMetrics;
this.pageStoreName = other.pageStoreName;
} }
public AddressSettings() { public AddressSettings() {
@ -914,6 +917,15 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
return this; return this;
} }
public SimpleString getPageStoreName() {
return pageStoreName;
}
public AddressSettings setPageStoreName(final SimpleString pageStoreName) {
this.pageStoreName = pageStoreName;
return this;
}
/** /**
* merge 2 objects in to 1 * merge 2 objects in to 1
* *
@ -1107,6 +1119,9 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
if (enableMetrics == null) { if (enableMetrics == null) {
enableMetrics = merged.enableMetrics; enableMetrics = merged.enableMetrics;
} }
if (pageStoreName == null) {
pageStoreName = merged.pageStoreName;
}
} }
@Override @Override
@ -1320,6 +1335,10 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
defaultGroupRebalancePauseDispatch = BufferHelper.readNullableBoolean(buffer); defaultGroupRebalancePauseDispatch = BufferHelper.readNullableBoolean(buffer);
} }
if (buffer.readableBytes() > 0) {
pageStoreName = buffer.readNullableSimpleString();
}
} }
@Override @Override
@ -1383,7 +1402,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
SimpleString.sizeofNullableString(expiryQueuePrefix) + SimpleString.sizeofNullableString(expiryQueuePrefix) +
SimpleString.sizeofNullableString(expiryQueueSuffix) + SimpleString.sizeofNullableString(expiryQueueSuffix) +
BufferHelper.sizeOfNullableBoolean(enableMetrics) + BufferHelper.sizeOfNullableBoolean(enableMetrics) +
BufferHelper.sizeOfNullableBoolean(defaultGroupRebalancePauseDispatch); BufferHelper.sizeOfNullableBoolean(defaultGroupRebalancePauseDispatch) +
SimpleString.sizeofNullableString(pageStoreName);
} }
@Override @Override
@ -1510,6 +1530,7 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
BufferHelper.writeNullableBoolean(buffer, defaultGroupRebalancePauseDispatch); BufferHelper.writeNullableBoolean(buffer, defaultGroupRebalancePauseDispatch);
buffer.writeNullableSimpleString(pageStoreName);
} }
/* (non-Javadoc) /* (non-Javadoc)
@ -1581,6 +1602,7 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
result = prime * result + ((expiryQueuePrefix == null) ? 0 : expiryQueuePrefix.hashCode()); result = prime * result + ((expiryQueuePrefix == null) ? 0 : expiryQueuePrefix.hashCode());
result = prime * result + ((expiryQueueSuffix == null) ? 0 : expiryQueueSuffix.hashCode()); result = prime * result + ((expiryQueueSuffix == null) ? 0 : expiryQueueSuffix.hashCode());
result = prime * result + ((enableMetrics == null) ? 0 : enableMetrics.hashCode()); result = prime * result + ((enableMetrics == null) ? 0 : enableMetrics.hashCode());
result = prime * result + ((pageStoreName == null) ? 0 : pageStoreName.hashCode());
return result; return result;
} }
@ -1928,6 +1950,12 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
} else if (!enableMetrics.equals(other.enableMetrics)) } else if (!enableMetrics.equals(other.enableMetrics))
return false; return false;
if (pageStoreName == null) {
if (other.pageStoreName != null)
return false;
} else if (!pageStoreName.equals(other.pageStoreName))
return false;
return true; return true;
} }
@ -2057,6 +2085,7 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
expiryQueueSuffix + expiryQueueSuffix +
", enableMetrics=" + ", enableMetrics=" +
enableMetrics + enableMetrics +
", pageAddress=" + pageStoreName +
"]"; "]";
} }
} }

View File

@ -3688,12 +3688,20 @@
</xsd:annotation> </xsd:annotation>
</xsd:element> </xsd:element>
<xsd:element name="page-store-name" type="xsd:string" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
the name of the page store to use, to allow the page store to coalesce for address hierarchies when wildcard routing is in play
</xsd:documentation>
</xsd:annotation>
</xsd:element>
</xsd:all> </xsd:all>
<xsd:attribute name="match" type="xsd:string" use="required"> <xsd:attribute name="match" type="xsd:string" use="required">
<xsd:annotation> <xsd:annotation>
<xsd:documentation> <xsd:documentation>
pattern for matching settings against addresses; can use wildards pattern for matching settings against addresses; can use wildcards
</xsd:documentation> </xsd:documentation>
</xsd:annotation> </xsd:annotation>
</xsd:attribute> </xsd:attribute>

View File

@ -378,6 +378,8 @@ public class FileConfigurationTest extends ConfigurationImplTest {
assertEquals(3, conf.getAddressesSettings().get("a1").getDefaultRingSize()); assertEquals(3, conf.getAddressesSettings().get("a1").getDefaultRingSize());
assertEquals(0, conf.getAddressesSettings().get("a1").getRetroactiveMessageCount()); assertEquals(0, conf.getAddressesSettings().get("a1").getRetroactiveMessageCount());
assertTrue(conf.getAddressesSettings().get("a1").isEnableMetrics()); assertTrue(conf.getAddressesSettings().get("a1").isEnableMetrics());
assertNull("none fonfigured", conf.getAddressesSettings().get("a1").getPageStoreName());
assertEquals(new SimpleString("a2.shared"), conf.getAddressesSettings().get("a2").getPageStoreName());
assertEquals("a2.1", conf.getAddressesSettings().get("a2").getDeadLetterAddress().toString()); assertEquals("a2.1", conf.getAddressesSettings().get("a2").getDeadLetterAddress().toString());
assertEquals(true, conf.getAddressesSettings().get("a2").isAutoCreateDeadLetterResources()); assertEquals(true, conf.getAddressesSettings().get("a2").isAutoCreateDeadLetterResources());

View File

@ -481,6 +481,7 @@
<default-consumer-window-size>10000</default-consumer-window-size> <default-consumer-window-size>10000</default-consumer-window-size>
<retroactive-message-count>10</retroactive-message-count> <retroactive-message-count>10</retroactive-message-count>
<enable-metrics>false</enable-metrics> <enable-metrics>false</enable-metrics>
<page-store-name>a2.shared</page-store-name>
</address-setting> </address-setting>
</address-settings> </address-settings>
<resource-limit-settings> <resource-limit-settings>

View File

@ -78,5 +78,6 @@
<default-consumer-window-size>10000</default-consumer-window-size> <default-consumer-window-size>10000</default-consumer-window-size>
<retroactive-message-count>10</retroactive-message-count> <retroactive-message-count>10</retroactive-message-count>
<enable-metrics>false</enable-metrics> <enable-metrics>false</enable-metrics>
<page-store-name>a2.shared</page-store-name>
</address-setting> </address-setting>
</address-settings> </address-settings>

View File

@ -781,6 +781,38 @@ the client-side. If the value is `BLOCK` then client message producers will
block when they try and send further messages. See the [Flow block when they try and send further messages. See the [Flow
Control](flow-control.md) and [Paging](paging.md) chapters for more info. Control](flow-control.md) and [Paging](paging.md) chapters for more info.
`page-store-name` defines the name of the shared page store for matching addresses.
It is typically unused because the page store name maps to an address name by default.
However when addresses are hierarchical and subscriptions use
[wildcards](wildcard-routing.md), this setting is **required** to support [paging](paging.md).
Subscriptions assume a single page store for cursor management and resource usage
calculations. Using an explicitly configured `page-store-name` that will match the
root address of the hierarchy, paging can coalesce to a single page store and
the required assumptions will hold.
For example, with a MULTICAST address hierarchy of:
- ticker.stock.us.apple
- ticker.stock.us.orange
- ticker.stock.eu.pear
and with wildcard subscriptions on:
- ticker.stock.#
- ticker.stock.eu.#
an address setting of:
```xml
<address-settings>
<address-setting match="ticker.stock.#">
<page-store-name>ticker.stock.#</page-store-name>
...
```
will ensure that all paged messages coalesce into a single page store named `ticker.stock.#`.
The name does not need to be the same as the `match` attribute, it can be any string value.
What **is** important is that the `match` attribute captures the root of the hierarchy that will
support wildcards subscriptions.
`message-counter-history-day-limit` is the number of days to keep message `message-counter-history-day-limit` is the number of days to keep message
counter history for this address assuming that `message-counter-enabled` is counter history for this address assuming that `message-counter-enabled` is
`true`. Default is `0`. `true`. Default is `0`.

View File

@ -20,5 +20,18 @@ This functionality is enabled by default. To turn it off add the following to th
</wildcard-addresses> </wildcard-addresses>
``` ```
## Paging with wild card addresses
Paging occurs at the address level and queue subscriptions access messages for an address through paging.
When wildcard routing is in play, it is normal for a queue to access multiple addresses and hence, potentially
multiple page stores.
To avoid the problems inherent in referencing multiple page stores, it is necessary to configure a wild card addresses
hierarchy with a single shared page store via an address setting called `page-store-name`.
```xml
<address-setting match="news.#">
<page-store-name>news-wildcard</page-store-name>
</address-setting>
```
For more information on the wild card syntax and how to configure it, take a look at [wildcard syntax](wildcard-syntax.md) chapter, For more information on the wild card syntax and how to configure it, take a look at [wildcard syntax](wildcard-syntax.md) chapter,
also see the topic hierarchy example in the [examples](examples.md). also see the topic hierarchy example in the [examples](examples.md).

View File

@ -56,10 +56,7 @@ under the License.
</goals> </goals>
<configuration> <configuration>
<ignore>${noServer}</ignore> <ignore>${noServer}</ignore>
<args> <configuration>${basedir}/target/classes/activemq/server0</configuration>
<arg>--addresses</arg>
<arg>news,news.usa,news.usa.wrestling,news.europe,news.europe.sport,news.europe.entertainment</arg>
</args>
</configuration> </configuration>
</execution> </execution>
<execution> <execution>

View File

@ -8,4 +8,14 @@ ActiveMQ Artemis wild-cards can use the character `#` which means "match any num
For example if I subscribe using the wild-card `news.europe.#`, then that would match messages sent to the addresses `news.europe`, `news.europe.sport` and `news.europe.entertainment`, but it does not match messages sent to the address `news.usa.wrestling`. For example if I subscribe using the wild-card `news.europe.#`, then that would match messages sent to the addresses `news.europe`, `news.europe.sport` and `news.europe.entertainment`, but it does not match messages sent to the address `news.usa.wrestling`.
Note that wildcard subscribers need some explicit configuration with respect to paging. The entire hierarchy needs to page to a single address such that subscribers don't race to store and account for individual messages.
Notice the address setting in broker.xml that configures matching address (the root of the hierarchy) to use the shared "news-wildcard" page store.
```xml
<address-setting match="news.#">
<page-store-name>news-wildcard</page-store-name>
</address-setting>
```
For more information on the wild-card syntax please consult the user manual. For more information on the wild-card syntax please consult the user manual.

View File

@ -0,0 +1,68 @@
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<configuration xmlns="urn:activemq" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
<core xmlns="urn:activemq:core">
<bindings-directory>./data/bindings</bindings-directory>
<journal-directory>./data/journal</journal-directory>
<large-messages-directory>./data/largemessages</large-messages-directory>
<paging-directory>./data/paging</paging-directory>
<!-- Acceptors -->
<acceptors>
<acceptor name="netty-acceptor">tcp://localhost:61616</acceptor>
</acceptors>
<!-- Other config -->
<security-settings>
<!--security for example queue-->
<security-setting match="#">
<permission roles="guest" type="createDurableQueue"/>
<permission roles="guest" type="deleteDurableQueue"/>
<permission roles="guest" type="createNonDurableQueue"/>
<permission roles="guest" type="deleteNonDurableQueue"/>
<permission roles="guest" type="createAddress"/>
<permission roles="guest" type="deleteAddress"/>
<permission roles="guest" type="consume"/>
<permission roles="guest" type="send"/>
</security-setting>
</security-settings>
<address-settings>
<!-- ensure that all addresses in the topic hierarchy reference a single shared page store -->
<address-setting match="news.#">
<page-store-name>news-wildcard</page-store-name>
</address-setting>
</address-settings>
<addresses>
<address name="news"/>
<address name="news.usa"/>
<address name="news.usa.wrestling"/>
<address name="news.europe"/>
<address name="news.europe.sport"/>
<address name="news.europe.entertainment"/>
</addresses>
</core>
</configuration>

View File

@ -0,0 +1,274 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.jms.client;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.util.JMSTestBase;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.Topic;
import javax.management.ObjectName;
import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@RunWith(Parameterized.class)
public class WildcardTest extends JMSTestBase {
@Parameters(name = "a={0},b={1},c={2}")
public static Iterable<Object[]> data() {
return Arrays.asList(new Object[][] {{"test.topic.A", "test.topic.B", "test.topic.#"},
{"test.topic.A", "test.topic.B", "test.#"}});
}
private String topicA;
private String topicB;
private String topicWildcard;
@Override
protected Configuration createDefaultConfig(boolean netty) throws Exception {
Configuration configuration = super.createDefaultConfig(netty).setJMXManagementEnabled(true);
configuration.getAddressesSettings().put("test.#", new AddressSettings().setPageStoreName(new SimpleString("test-topic-hierarchy-root")));
return configuration;
}
public WildcardTest(String topicA, String topicB, String topicWildcard) {
super();
this.topicA = topicA;
this.topicB = topicB;
this.topicWildcard = topicWildcard;
}
@Test
public void testWildcard1Topic() throws Exception {
Session sessionA = createSession();
MessageProducer producerA = createProducer(sessionA, topicA);
MessageConsumer consumerA = createConsumer(topicA);
MessageConsumer consumerWC = createConsumer(topicWildcard);
Message message = sessionA.createObjectMessage(1);
producerA.send(message);
ObjectMessage received1 = (ObjectMessage)consumerA.receive(500);
Assert.assertNotNull(received1);
Assert.assertNotNull(received1.getObject());
ObjectMessage received2 = (ObjectMessage)consumerWC.receive(500);
Assert.assertNotNull(received2);
Assert.assertNotNull(received2.getObject());
Assert.assertEquals(received1.getJMSMessageID(), received2.getJMSMessageID());
Assert.assertEquals(received1.getObject(), received2.getObject());
}
@Test
public void testWildcard2Topics() throws Exception {
Session sessionA = createSession();
MessageProducer producerA = createProducer(sessionA, topicA);
Session sessionB = createSession();
MessageProducer producerB = createProducer(sessionA, topicB);
MessageConsumer consumerA = createConsumer(topicA);
MessageConsumer consumerB = createConsumer(topicB);
MessageConsumer consumerWC = createConsumer(topicWildcard);
Message message1 = sessionA.createObjectMessage(1);
producerA.send(message1);
Message message2 = sessionB.createObjectMessage(2);
producerB.send(message2);
ObjectMessage received1 = (ObjectMessage)consumerA.receive(500);
Assert.assertNotNull(received1);
Assert.assertNotNull(received1.getObject());
ObjectMessage received2 = (ObjectMessage)consumerB.receive(500);
Assert.assertNotNull(received2);
Assert.assertNotNull(received2.getObject());
ObjectMessage received3 = (ObjectMessage)consumerWC.receive(500);
Assert.assertNotNull(received3);
Assert.assertNotNull(received3.getObject());
ObjectMessage received4 = (ObjectMessage)consumerWC.receive(500);
Assert.assertNotNull(received4);
Assert.assertNotNull(received4.getObject());
Assert.assertEquals(received1.getJMSMessageID(), received3.getJMSMessageID());
Assert.assertEquals(received1.getObject(), received3.getObject());
Assert.assertEquals(received2.getJMSMessageID(), received4.getJMSMessageID());
Assert.assertEquals(received2.getObject(), received4.getObject());
}
@Test
public void testNegativeAddressSizeOnWildcard1() throws Exception {
testNegativeAddressSizeOnWildcard(1);
}
@Test
public void testNegativeAddressSizeOnWildcard2() throws Exception {
testNegativeAddressSizeOnWildcard(2);
}
@Test
public void testNegativeAddressSizeOnWildcard10() throws Exception {
testNegativeAddressSizeOnWildcard(10);
}
@Test
public void testNegativeAddressSizeOnWildcard100() throws Exception {
testNegativeAddressSizeOnWildcard(100);
}
@Test
public void testNegativeAddressSizeOnWildcardAsync1() throws Exception {
testNegativeAddressSizeOnWildcardAsync(1);
}
@Test
public void testNegativeAddressSizeOnWildcardAsync2() throws Exception {
testNegativeAddressSizeOnWildcardAsync(2);
}
@Test
public void testNegativeAddressSizeOnWildcardAsync10() throws Exception {
testNegativeAddressSizeOnWildcardAsync(10);
}
@Test
public void testNegativeAddressSizeOnWildcardAsync100() throws Exception {
testNegativeAddressSizeOnWildcardAsync(100);
}
private void testNegativeAddressSizeOnWildcard(int numMessages) throws Exception {
Session sessionA = createSession();
MessageProducer producerA = createProducer(sessionA, topicA);
MessageConsumer consumerA = createConsumer(topicA);
MessageConsumer consumerWC = createConsumer(topicWildcard);
for (int i = 0; i < numMessages; i++) {
Message message = sessionA.createObjectMessage(i);
producerA.send(message);
}
for (int i = 0; i < numMessages; i++) {
ObjectMessage received1 = (ObjectMessage)consumerA.receive(500);
Assert.assertNotNull("consumerA message - " + i + " is null", received1);
Assert.assertNotNull("consumerA message - " + i + " is null", received1.getObject());
ObjectMessage received2 = (ObjectMessage)consumerWC.receive(500);
Assert.assertNotNull("consumerWC message - " + i + " is null", received2);
Assert.assertNotNull("consumerWC message - " + i + " is null", received2.getObject());
}
long addressSizeA = (Long)mbeanServer.getAttribute(new ObjectName("org.apache.activemq.artemis:broker=\"localhost\",component=addresses,address=\"" + topicA + "\""), "AddressSize");
long addressSizeWC = (Long)mbeanServer.getAttribute(new ObjectName("org.apache.activemq.artemis:broker=\"localhost\",component=addresses,address=\"" + topicWildcard + "\""), "AddressSize");
Assert.assertTrue(topicA + " AddressSize < 0", addressSizeA >= 0);
Assert.assertTrue(topicWildcard + " AddressSize < 0", addressSizeWC >= 0);
}
private void testNegativeAddressSizeOnWildcardAsync(int numMessages) throws Exception {
Session sessionA = createSession();
MessageProducer producerA = createProducer(sessionA, topicA);
CountDownLatch latchA = new CountDownLatch(numMessages);
MessageConsumer consumerA = createAsyncConsumer(topicA, latchA);
CountDownLatch latchWC = new CountDownLatch(numMessages);
MessageConsumer consumerWC = createAsyncConsumer(topicWildcard, latchWC);
for (int i = 0; i < numMessages; i++) {
Message message = sessionA.createObjectMessage(i);
producerA.send(message);
}
if (!latchA.await(5, TimeUnit.SECONDS)) {
Assert.fail("Waiting to receive " + latchA.getCount() + " messages on " + topicA);
}
if (!latchWC.await(5, TimeUnit.SECONDS)) {
Assert.fail("Waiting to receive " + latchWC.getCount() + " messages on " + topicWildcard);
}
long addressSizeA = (Long)mbeanServer.getAttribute(new ObjectName("org.apache.activemq.artemis:broker=\"localhost\",component=addresses,address=\"" + topicA + "\""), "AddressSize");
long addressSizeWC = (Long)mbeanServer.getAttribute(new ObjectName("org.apache.activemq.artemis:broker=\"localhost\",component=addresses,address=\"" + topicWildcard + "\""), "AddressSize");
Assert.assertTrue(topicA + " AddressSize < 0", addressSizeA >= 0);
Assert.assertTrue(topicWildcard + " AddressSize < 0", addressSizeWC >= 0);
}
private Session createSession() throws Exception {
Connection connection = createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
return session;
}
private MessageProducer createProducer(Session session, String topicName) throws Exception {
Topic topic = session.createTopic(topicName);
MessageProducer producer = session.createProducer(topic);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
return producer;
}
private MessageConsumer createConsumer(String topicName) throws Exception {
Connection connection = createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(topicName);
MessageConsumer consumer = session.createConsumer(topic, null, false);
return consumer;
}
private MessageConsumer createAsyncConsumer(String topicName, CountDownLatch latch) throws Exception {
MessageConsumer consumer = createConsumer(topicName);
consumer.setMessageListener(m -> {
try {
latch.countDown();
} catch (Throwable ex) {
ex.printStackTrace();
}
});
return consumer;
}
}

View File

@ -0,0 +1,234 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.mqtt;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import java.util.LinkedList;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.tests.integration.mqtt.imported.MQTTTestSupport;
import org.apache.activemq.artemis.tests.util.Wait;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.jgroups.util.UUID;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
public class MqttWildCardSubAutoCreateTest extends MQTTTestSupport {
private int lastId;
private MqttClient subscriber;
private MqttClient sender;
private volatile LinkedList<String> topics = new LinkedList<>();
@After
public void clean() throws MqttException {
topics.clear();
if (subscriber != null && subscriber.isConnected()) {
subscriber.disconnect();
subscriber.close();
}
if (sender != null && sender.isConnected()) {
sender.disconnect();
sender.close();
}
}
@Override
protected ActiveMQServer createServer(final boolean realFiles, final Configuration configuration) {
configuration.getAddressesSettings().remove("#");
configuration.getAddressesSettings().put("#", new AddressSettings().setPageSizeBytes(5).setMaxSizeBytes(10).setPageStoreName(new SimpleString("news-bag")));
configuration.setGlobalMaxSize(15);
return createServer(realFiles, configuration, AddressSettings.DEFAULT_PAGE_SIZE, 10);
}
@Test
public void testWildcardSubAutoCreateDoesNotPageToWildcardAddress() throws Exception {
server.getManagementService().enableNotifications(false);
String subscriberId = UUID.randomUUID().toString();
String senderId = UUID.randomUUID().toString();
String subscribeTo = "A.*";
String publishTo = "A.a";
subscriber = createMqttClient(subscriberId);
subscriber.subscribe(subscribeTo, 2);
subscriber.disconnect();
sender = createMqttClient(senderId);
sender.publish(publishTo, UUID.randomUUID().toString().getBytes(), 2, false);
sender.publish(publishTo, UUID.randomUUID().toString().getBytes(), 2, false);
assertTrue(server.getPagingManager().getPageStore(new SimpleString(subscribeTo)).isPaging());
subscriber = createMqttClient(subscriberId);
subscriber.subscribe(subscribeTo, 2);
boolean satisfied = Wait.waitFor(() -> topics.size() == 2, 5_000);
if (!satisfied) {
Assert.fail();
}
subscriber.messageArrivedComplete(lastId, 2);
subscriber.disconnect();
subscriber.close();
for (String topic : topics) {
assertEquals("A/a", topic);
}
}
private MqttClient createMqttClient(String clientId) throws MqttException {
MqttClient client = new MqttClient("tcp://localhost:" + getPort(), clientId, new MemoryPersistence());
client.setCallback(createCallback());
client.setManualAcks(true);
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(false);
client.connect(options);
return client;
}
private MqttCallback createCallback() {
return new MqttCallback() {
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
topics.add(topic);
lastId = message.getId();
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
}
@Override
public void connectionLost(Throwable cause) {
}
};
}
@Test
public void testCoreHierarchicalTopic() throws Exception {
ConnectionFactory cf = new ActiveMQConnectionFactory();
Connection connection = cf.createConnection();
connection.setClientID("CLI-ID");
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topicSubscribe = ActiveMQJMSClient.createTopic("news.europe.#");
MessageConsumer messageConsumer = session.createDurableConsumer(topicSubscribe, "news-eu");
MessageProducer producer = session.createProducer(null);
Topic topicNewsUsaWrestling = ActiveMQJMSClient.createTopic("news.usa.wrestling");
Topic topicNewsEuropeSport = ActiveMQJMSClient.createTopic("news.europe.sport");
Topic topicNewsEuropeEntertainment = ActiveMQJMSClient.createTopic("news.europe.entertainment");
TextMessage messageWrestlingNews = session.createTextMessage("Hulk Hogan starts ballet classes");
addSizeProp(messageWrestlingNews);
producer.send(topicNewsUsaWrestling, messageWrestlingNews);
TextMessage messageEuropeSport = session.createTextMessage("Lewis Hamilton joins European synchronized swimming team");
producer.send(topicNewsEuropeSport, messageEuropeSport);
TextMessage messageEuropeEntertainment = session.createTextMessage("John Lennon resurrected from dead");
producer.send(topicNewsEuropeEntertainment, messageEuropeEntertainment);
connection.start();
// second consumer to page to different address
Topic topicSubscribeAllNews = ActiveMQJMSClient.createTopic("news.#");
MessageConsumer messageConsumerAllNews = session.createDurableConsumer(topicSubscribeAllNews, "news-all");
producer.send(topicNewsUsaWrestling, messageWrestlingNews);
producer.send(topicNewsEuropeEntertainment, messageEuropeEntertainment);
MessageConsumer messageConsumerEuEnt = session.createDurableConsumer(topicNewsEuropeEntertainment, "news-eu-ent");
producer.send(topicNewsUsaWrestling, messageWrestlingNews);
producer.send(topicNewsEuropeEntertainment, messageEuropeEntertainment);
System.out.println("Usage " + server.getPagingManager().getGlobalSize());
TextMessage msg = (TextMessage) messageConsumerAllNews.receive(5000);
System.out.println("1 All received message: " + msg.getText() + ", dest: " + msg.getJMSDestination());
msg = (TextMessage) messageConsumerAllNews.receive(5000);
System.out.println("2 All received message: " + msg.getText() + ", dest: " + msg.getJMSDestination());
msg = (TextMessage) messageConsumerEuEnt.receive(5000);
System.out.println("3 EuEnt received message: " + msg.getText() + ", dest: " + msg.getJMSDestination());
TextMessage messageReceived1 = (TextMessage) messageConsumer.receive(5000);
System.out.println("4 Received message: " + messageReceived1.getText() + ", dest: " + messageReceived1.getJMSDestination());
TextMessage messageReceived2 = (TextMessage) messageConsumer.receive(5000);
System.out.println("5 Received message: " + messageReceived2.getText() + ", dest: " + messageReceived2.getJMSDestination());
// verify messageConsumer gets messageEuropeEntertainment
msg = (TextMessage) messageConsumer.receive(5000);
System.out.println("6 Eu received message: " + msg.getText() + ", dest: " + msg.getJMSDestination());
assertEquals(topicNewsEuropeSport, messageReceived1.getJMSDestination());
assertEquals(topicNewsEuropeEntertainment, messageReceived2.getJMSDestination());
assertEquals(topicNewsEuropeEntertainment, msg.getJMSDestination());
messageConsumer.close();
messageConsumerAllNews.close();
int countOfPageStores = server.getPagingManager().getStoreNames().length;
assertEquals("there should only be one", 1, countOfPageStores);
connection.close();
}
private void addSizeProp(TextMessage messageWrestlingNews) throws JMSException {
messageWrestlingNews.setStringProperty("stuff", new String(new byte[1024]));
}
}

View File

@ -233,7 +233,7 @@ public class MQTTTestSupport extends ActiveMQTestBase {
TransportConfiguration transportConfiguration = new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params); TransportConfiguration transportConfiguration = new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params);
server.getConfiguration().getAcceptorConfigurations().add(transportConfiguration); server.getConfiguration().getAcceptorConfigurations().add(transportConfiguration);
log.debugv("Added connector {} to broker", getProtocolScheme()); log.debug("Added CORE connector to broker");
} }
protected void addMQTTConnector() throws Exception { protected void addMQTTConnector() throws Exception {
@ -243,7 +243,7 @@ public class MQTTTestSupport extends ActiveMQTestBase {
server.getConfiguration().addAcceptorConfiguration("MQTT", "tcp://localhost:" + port + "?protocols=MQTT;anycastPrefix=anycast:;multicastPrefix=multicast:"); server.getConfiguration().addAcceptorConfiguration("MQTT", "tcp://localhost:" + port + "?protocols=MQTT;anycastPrefix=anycast:;multicastPrefix=multicast:");
log.debugv("Added connector {} to broker", getProtocolScheme()); log.debug("Added MQTT connector to broker");
} }
public void stopBroker() throws Exception { public void stopBroker() throws Exception {

View File

@ -0,0 +1,141 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.paging;
import javax.jms.Connection;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.Topic;
import java.util.Date;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
import org.apache.activemq.artemis.api.jms.JMSFactoryType;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.jms.client.ActiveMQJMSConnectionFactory;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.Test;
public class PagingSizeWildcardTest extends ActiveMQTestBase {
@Test
public void testWildcardPageSize() throws Exception {
Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
config.getAddressesSettings().put("A.#", new AddressSettings().setPageStoreName(new SimpleString("shared-page-store-for-a#")));
ActiveMQServer server = createServer(true, config, 200, 400);
server.start();
ActiveMQJMSConnectionFactory cf = (ActiveMQJMSConnectionFactory) ActiveMQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, new TransportConfiguration(INVM_CONNECTOR_FACTORY));
try {
Connection conn = cf.createConnection();
conn.start();
Session sessA = conn.createSession(true, Session.SESSION_TRANSACTED);
Topic subA = sessA.createTopic("A.a");
MessageConsumer consumerA = sessA.createConsumer(subA);
Session sessW = conn.createSession(true, Session.SESSION_TRANSACTED);
Topic subW = sessA.createTopic("A.#");
MessageConsumer consumerW = sessW.createConsumer(subW);
final int numMessages = 5;
publish(cf, numMessages);
for (int i = 0; i < numMessages; i++) {
assertNotNull(" on " + i, consumerA.receive(1000));
assertNotNull(" on " + i, consumerW.receive(1000));
}
// commit in reverse order to dispatch
sessW.commit();
sessA.commit();
for (SimpleString psName : server.getPagingManager().getStoreNames()) {
assertTrue("non negative size: " + psName, server.getPagingManager().getPageStore(psName).getAddressSize() >= 0);
}
conn.close();
} finally {
server.stop();
}
}
@Test
public void testDurableSubReveresOrderAckPageSize() throws Exception {
Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
ActiveMQServer server = createServer(true, config, 200, 400);
server.start();
ActiveMQJMSConnectionFactory cf = (ActiveMQJMSConnectionFactory) ActiveMQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, new TransportConfiguration(INVM_CONNECTOR_FACTORY));
try {
Connection conn = cf.createConnection();
conn.setClientID("IDD");
conn.start();
Session sessA = conn.createSession(true, Session.SESSION_TRANSACTED);
Topic topic = sessA.createTopic("A.a");
MessageConsumer consumerA = sessA.createDurableConsumer(topic, "1");
Session sessW = conn.createSession(true, Session.SESSION_TRANSACTED);
MessageConsumer consumerW = sessW.createDurableConsumer(topic, "2");
final int numMessages = 5;
publish(cf, numMessages);
for (int i = 0; i < numMessages; i++) {
assertNotNull(" on " + i, consumerA.receive(1000));
assertNotNull(" on " + i, consumerW.receive(1000));
}
// commit in reverse order to dispatch
sessW.commit();
sessA.commit();
for (SimpleString psName : server.getPagingManager().getStoreNames()) {
assertTrue("non negative size: " + psName, server.getPagingManager().getPageStore(psName).getAddressSize() >= 0);
}
conn.close();
} finally {
server.stop();
}
}
private void publish(ActiveMQJMSConnectionFactory cf, int numMessages) throws Exception {
Connection conn = cf.createConnection();
conn.start();
Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic subA = sess.createTopic("A.a");
MessageProducer messageProducer = sess.createProducer(subA);
for (int i = 0; i < numMessages; i++) {
messageProducer.send(sess.createTextMessage(new Date().toString()));
}
conn.close();
}
}

View File

@ -6591,6 +6591,92 @@ public class PagingTest extends ActiveMQTestBase {
server.stop(); server.stop();
} }
@Test
public void testHierarchicalPagingStoreNotDestroyed() throws Exception {
clearDataRecreateServerDirs();
final SimpleString pageAddress = new SimpleString("A.#");
Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
config.getAddressesSettings().put("A.#", new AddressSettings().setPageStoreName(pageAddress));
server = createServer(true, config, 100, 500);
server.start();
final int numberOfMessages = 10;
final int messageSize = 100;
locator = createInVMNonHALocator().setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true);
sf = createSessionFactory(locator);
ClientSession session = sf.createSession(false, false, false);
final SimpleString addressA = new SimpleString("A.a.#");
session.createQueue(new QueueConfiguration(addressA));
final SimpleString addressB = new SimpleString("A.b.#");
session.createQueue(new QueueConfiguration(addressB));
final SimpleString produceAddressA = new SimpleString("A.a.a");
ClientProducer producerA = session.createProducer(produceAddressA);
final SimpleString produceAddressB = new SimpleString("A.b.a");
ClientProducer producerB = session.createProducer(produceAddressB);
ClientMessage message = null;
byte[] body = new byte[messageSize];
ByteBuffer bb = ByteBuffer.wrap(body);
for (int j = 1; j <= messageSize; j++) {
bb.put(getSamplebyte(j));
}
for (int i = 0; i < numberOfMessages; i++) {
message = session.createMessage(true);
ActiveMQBuffer bodyLocal = message.getBodyBuffer();
bodyLocal.writeBytes(body);
producerA.send(message);
producerB.send(message);
session.commit();
}
session.commit();
producerA.close();
producerB.close();
assertTrue(Arrays.asList(server.getPagingManager().getStoreNames()).contains(pageAddress));
assertTrue(server.getPagingManager().getPageStore(pageAddress).isPaging());
session.deleteQueue(addressA);
session.deleteQueue(addressB);
session.close();
System.err.println("storeNames: " + Arrays.asList(server.getPagingManager().getStoreNames()));
server.getPagingManager().deletePageStore(produceAddressA);
server.getPagingManager().deletePageStore(produceAddressB);
sf.close();
locator.close();
locator = null;
sf = null;
assertTrue(Arrays.asList(server.getPagingManager().getStoreNames()).contains(pageAddress));
// Ensure wildcard store is still there
server.getPagingManager().reloadStores();
assertTrue(Arrays.asList(server.getPagingManager().getStoreNames()).contains(pageAddress));
server.stop();
server.start();
assertTrue(Arrays.asList(server.getPagingManager().getStoreNames()).contains(pageAddress));
server.stop();
}
@Test @Test
public void testStopPagingWithoutConsumersIfTwoPages() throws Exception { public void testStopPagingWithoutConsumersIfTwoPages() throws Exception {
testStopPagingWithoutConsumersOnOneQueue(true); testStopPagingWithoutConsumersOnOneQueue(true);