diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/QueueConfiguration.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/QueueConfiguration.java index 2257721458..1166413cdb 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/QueueConfiguration.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/QueueConfiguration.java @@ -109,6 +109,7 @@ public class QueueConfiguration implements Serializable { private Boolean internal; private Boolean _transient; private Boolean autoCreated; + private transient SimpleString pageStoreName; /** * Instantiate this object and invoke {@link #setName(SimpleString)} @@ -877,4 +878,12 @@ public class QueueConfiguration implements Serializable { + ", transient=" + _transient + ", autoCreated=" + autoCreated + ']'; } + + public void setPageStoreName(SimpleString pageStoreName) { + this.pageStoreName = pageStoreName; + } + + public SimpleString getPageStoreName() { + return pageStoreName != null ? pageStoreName : getAddress(); + } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java index 1a6f3d0c93..6213d75809 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java @@ -296,6 +296,7 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { private static final String ENABLE_METRICS = "enable-metrics"; + private static final String PAGE_STORE_NAME = "page-store-name"; // Attributes ---------------------------------------------------- @@ -1266,6 +1267,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { addressSettings.setExpiryQueueSuffix(new SimpleString(getTrimmedTextContent(child))); } else if (ENABLE_METRICS.equalsIgnoreCase(name)) { addressSettings.setEnableMetrics(XMLUtil.parseBoolean(child)); + } else if (PAGE_STORE_NAME.equalsIgnoreCase(name)) { + addressSettings.setPageStoreName(new SimpleString(getTrimmedTextContent(child))); } } return setting; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java index 4f930b7f0f..23828bfcdd 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java @@ -336,6 +336,14 @@ public final class PagingManagerImpl implements PagingManager { @Override public void deletePageStore(final SimpleString storeName) throws Exception { + final AddressSettings addressSettings = addressSettingsRepository.getMatch(storeName.toString()); + if (addressSettings != null && addressSettings.getPageStoreName() != null) { + if (logger.isTraceEnabled()) { + logger.tracev("not deleting potentially shared pageAddress {} match for {}", addressSettings.getPageStoreName(), storeName); + } + return; + } + syncLock.readLock().lock(); try { PagingStore store = stores.remove(storeName); @@ -352,11 +360,16 @@ public final class PagingManagerImpl implements PagingManager { * This method creates a new store if not exist. */ @Override - public PagingStore getPageStore(final SimpleString storeName) throws Exception { + public PagingStore getPageStore(SimpleString storeName) throws Exception { if (managementAddress != null && storeName.startsWith(managementAddress)) { return null; } + final AddressSettings addressSettings = addressSettingsRepository.getMatch(storeName.toString()); + if (addressSettings != null && addressSettings.getPageStoreName() != null) { + storeName = addressSettings.getPageStoreName(); + } + PagingStore store = stores.get(storeName); if (store != null) { return store; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java index bc036adc61..6fb4797c68 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java @@ -844,8 +844,6 @@ public class PagingStoreImpl implements PagingStore { return false; } - message.setAddress(address); - final long transactionID = tx == null ? -1 : tx.getID(); PagedMessage pagedMessage = new PagedMessageImpl(message, routeQueues(tx, listCtx), transactionID); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConfigurationUtils.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConfigurationUtils.java index 5bbf7c29ed..6f9788e08c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConfigurationUtils.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConfigurationUtils.java @@ -46,5 +46,6 @@ public class QueueConfigurationUtils { config.setAutoDeleteMessageCount(config.getAutoDeleteMessageCount() == null ? as.getAutoDeleteQueuesMessageCount() : config.getAutoDeleteMessageCount()); config.setEnabled(config.isEnabled() == null ? ActiveMQDefaultConfiguration.getDefaultEnabled() : config.isEnabled()); + config.setPageStoreName(as.getPageStoreName()); } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java index 13e3c313b3..9b616ff41a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java @@ -140,7 +140,7 @@ public class QueueFactoryImpl implements QueueFactory { PageSubscription pageSubscription; try { - PagingStore pageStore = pagingManager.getPageStore(queueConfiguration.getAddress()); + PagingStore pageStore = pagingManager.getPageStore(queueConfiguration.getPageStoreName()); if (pageStore != null) { pageSubscription = pageStore.getCursorProvider().createSubscription(queueConfiguration.getId(), FilterImpl.createFilter(queueConfiguration.getFilterString()), queueConfiguration.isDurable()); } else { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java index e713b08f54..65da14b300 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java @@ -253,6 +253,8 @@ public class AddressSettings implements Mergeable, Serializable private Boolean enableMetrics = null; + private SimpleString pageStoreName = null; + //from amq5 //make it transient private transient Integer queuePrefetch = null; @@ -318,6 +320,7 @@ public class AddressSettings implements Mergeable, Serializable this.defaultGroupFirstKey = other.defaultGroupFirstKey; this.defaultRingSize = other.defaultRingSize; this.enableMetrics = other.enableMetrics; + this.pageStoreName = other.pageStoreName; } public AddressSettings() { @@ -914,6 +917,15 @@ public class AddressSettings implements Mergeable, Serializable return this; } + public SimpleString getPageStoreName() { + return pageStoreName; + } + + public AddressSettings setPageStoreName(final SimpleString pageStoreName) { + this.pageStoreName = pageStoreName; + return this; + } + /** * merge 2 objects in to 1 * @@ -1107,6 +1119,9 @@ public class AddressSettings implements Mergeable, Serializable if (enableMetrics == null) { enableMetrics = merged.enableMetrics; } + if (pageStoreName == null) { + pageStoreName = merged.pageStoreName; + } } @Override @@ -1320,6 +1335,10 @@ public class AddressSettings implements Mergeable, Serializable defaultGroupRebalancePauseDispatch = BufferHelper.readNullableBoolean(buffer); } + if (buffer.readableBytes() > 0) { + pageStoreName = buffer.readNullableSimpleString(); + } + } @Override @@ -1383,7 +1402,8 @@ public class AddressSettings implements Mergeable, Serializable SimpleString.sizeofNullableString(expiryQueuePrefix) + SimpleString.sizeofNullableString(expiryQueueSuffix) + BufferHelper.sizeOfNullableBoolean(enableMetrics) + - BufferHelper.sizeOfNullableBoolean(defaultGroupRebalancePauseDispatch); + BufferHelper.sizeOfNullableBoolean(defaultGroupRebalancePauseDispatch) + + SimpleString.sizeofNullableString(pageStoreName); } @Override @@ -1510,6 +1530,7 @@ public class AddressSettings implements Mergeable, Serializable BufferHelper.writeNullableBoolean(buffer, defaultGroupRebalancePauseDispatch); + buffer.writeNullableSimpleString(pageStoreName); } /* (non-Javadoc) @@ -1581,6 +1602,7 @@ public class AddressSettings implements Mergeable, Serializable result = prime * result + ((expiryQueuePrefix == null) ? 0 : expiryQueuePrefix.hashCode()); result = prime * result + ((expiryQueueSuffix == null) ? 0 : expiryQueueSuffix.hashCode()); result = prime * result + ((enableMetrics == null) ? 0 : enableMetrics.hashCode()); + result = prime * result + ((pageStoreName == null) ? 0 : pageStoreName.hashCode()); return result; } @@ -1928,6 +1950,12 @@ public class AddressSettings implements Mergeable, Serializable } else if (!enableMetrics.equals(other.enableMetrics)) return false; + if (pageStoreName == null) { + if (other.pageStoreName != null) + return false; + } else if (!pageStoreName.equals(other.pageStoreName)) + return false; + return true; } @@ -2057,6 +2085,7 @@ public class AddressSettings implements Mergeable, Serializable expiryQueueSuffix + ", enableMetrics=" + enableMetrics + + ", pageAddress=" + pageStoreName + "]"; } } diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd index 7b99be9708..0e0ea15d50 100644 --- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd +++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd @@ -3687,13 +3687,21 @@ + + + + + the name of the page store to use, to allow the page store to coalesce for address hierarchies when wildcard routing is in play + + + - pattern for matching settings against addresses; can use wildards + pattern for matching settings against addresses; can use wildcards diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java index 583b45045b..cab5752314 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java @@ -378,6 +378,8 @@ public class FileConfigurationTest extends ConfigurationImplTest { assertEquals(3, conf.getAddressesSettings().get("a1").getDefaultRingSize()); assertEquals(0, conf.getAddressesSettings().get("a1").getRetroactiveMessageCount()); assertTrue(conf.getAddressesSettings().get("a1").isEnableMetrics()); + assertNull("none fonfigured", conf.getAddressesSettings().get("a1").getPageStoreName()); + assertEquals(new SimpleString("a2.shared"), conf.getAddressesSettings().get("a2").getPageStoreName()); assertEquals("a2.1", conf.getAddressesSettings().get("a2").getDeadLetterAddress().toString()); assertEquals(true, conf.getAddressesSettings().get("a2").isAutoCreateDeadLetterResources()); diff --git a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml index 0bde4f2d2c..1075602851 100644 --- a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml +++ b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml @@ -481,6 +481,7 @@ 10000 10 false + a2.shared diff --git a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config-address-settings.xml b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config-address-settings.xml index 5be0f08c01..83267f31b6 100644 --- a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config-address-settings.xml +++ b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config-address-settings.xml @@ -78,5 +78,6 @@ 10000 10 false + a2.shared \ No newline at end of file diff --git a/docs/user-manual/en/address-model.md b/docs/user-manual/en/address-model.md index 64e9acc02c..369e6fdd1e 100644 --- a/docs/user-manual/en/address-model.md +++ b/docs/user-manual/en/address-model.md @@ -781,6 +781,38 @@ the client-side. If the value is `BLOCK` then client message producers will block when they try and send further messages. See the [Flow Control](flow-control.md) and [Paging](paging.md) chapters for more info. +`page-store-name` defines the name of the shared page store for matching addresses. +It is typically unused because the page store name maps to an address name by default. +However when addresses are hierarchical and subscriptions use +[wildcards](wildcard-routing.md), this setting is **required** to support [paging](paging.md). +Subscriptions assume a single page store for cursor management and resource usage +calculations. Using an explicitly configured `page-store-name` that will match the +root address of the hierarchy, paging can coalesce to a single page store and +the required assumptions will hold. + +For example, with a MULTICAST address hierarchy of: + - ticker.stock.us.apple + - ticker.stock.us.orange + - ticker.stock.eu.pear + + and with wildcard subscriptions on: + - ticker.stock.# + - ticker.stock.eu.# + + an address setting of: + + ```xml + + + ticker.stock.# + ... + ``` + will ensure that all paged messages coalesce into a single page store named `ticker.stock.#`. + The name does not need to be the same as the `match` attribute, it can be any string value. + What **is** important is that the `match` attribute captures the root of the hierarchy that will + support wildcards subscriptions. + + `message-counter-history-day-limit` is the number of days to keep message counter history for this address assuming that `message-counter-enabled` is `true`. Default is `0`. diff --git a/docs/user-manual/en/wildcard-routing.md b/docs/user-manual/en/wildcard-routing.md index 0db7748543..7fef8f3b3c 100644 --- a/docs/user-manual/en/wildcard-routing.md +++ b/docs/user-manual/en/wildcard-routing.md @@ -20,5 +20,18 @@ This functionality is enabled by default. To turn it off add the following to th ``` +## Paging with wild card addresses +Paging occurs at the address level and queue subscriptions access messages for an address through paging. +When wildcard routing is in play, it is normal for a queue to access multiple addresses and hence, potentially +multiple page stores. +To avoid the problems inherent in referencing multiple page stores, it is necessary to configure a wild card addresses +hierarchy with a single shared page store via an address setting called `page-store-name`. + +```xml + + news-wildcard + +``` + For more information on the wild card syntax and how to configure it, take a look at [wildcard syntax](wildcard-syntax.md) chapter, also see the topic hierarchy example in the [examples](examples.md). diff --git a/examples/features/standard/topic-hierarchies/pom.xml b/examples/features/standard/topic-hierarchies/pom.xml index 0079923e46..f830461810 100644 --- a/examples/features/standard/topic-hierarchies/pom.xml +++ b/examples/features/standard/topic-hierarchies/pom.xml @@ -56,10 +56,7 @@ under the License. ${noServer} - - --addresses - news,news.usa,news.usa.wrestling,news.europe,news.europe.sport,news.europe.entertainment - + ${basedir}/target/classes/activemq/server0 diff --git a/examples/features/standard/topic-hierarchies/readme.md b/examples/features/standard/topic-hierarchies/readme.md index 2f70bc719b..9f3adc00ca 100644 --- a/examples/features/standard/topic-hierarchies/readme.md +++ b/examples/features/standard/topic-hierarchies/readme.md @@ -8,4 +8,14 @@ ActiveMQ Artemis wild-cards can use the character `#` which means "match any num For example if I subscribe using the wild-card `news.europe.#`, then that would match messages sent to the addresses `news.europe`, `news.europe.sport` and `news.europe.entertainment`, but it does not match messages sent to the address `news.usa.wrestling`. +Note that wildcard subscribers need some explicit configuration with respect to paging. The entire hierarchy needs to page to a single address such that subscribers don't race to store and account for individual messages. + +Notice the address setting in broker.xml that configures matching address (the root of the hierarchy) to use the shared "news-wildcard" page store. + +```xml + + news-wildcard + +``` + For more information on the wild-card syntax please consult the user manual. \ No newline at end of file diff --git a/examples/features/standard/topic-hierarchies/src/main/resources/activemq/server0/broker.xml b/examples/features/standard/topic-hierarchies/src/main/resources/activemq/server0/broker.xml new file mode 100644 index 0000000000..32e69349d0 --- /dev/null +++ b/examples/features/standard/topic-hierarchies/src/main/resources/activemq/server0/broker.xml @@ -0,0 +1,68 @@ + + + + + + ./data/bindings + + ./data/journal + + ./data/largemessages + + ./data/paging + + + + tcp://localhost:61616 + + + + + + + + + + + + + + + + + + + + + + news-wildcard + + + + +
+
+
+
+
+
+ + + diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/WildcardTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/WildcardTest.java new file mode 100644 index 0000000000..08b37d5de2 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/WildcardTest.java @@ -0,0 +1,274 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.jms.client; + +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.tests.util.JMSTestBase; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.ObjectMessage; +import javax.jms.Session; +import javax.jms.Topic; +import javax.management.ObjectName; +import java.util.Arrays; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +@RunWith(Parameterized.class) +public class WildcardTest extends JMSTestBase { + + @Parameters(name = "a={0},b={1},c={2}") + public static Iterable data() { + return Arrays.asList(new Object[][] {{"test.topic.A", "test.topic.B", "test.topic.#"}, + {"test.topic.A", "test.topic.B", "test.#"}}); + } + + private String topicA; + private String topicB; + private String topicWildcard; + + @Override + protected Configuration createDefaultConfig(boolean netty) throws Exception { + Configuration configuration = super.createDefaultConfig(netty).setJMXManagementEnabled(true); + configuration.getAddressesSettings().put("test.#", new AddressSettings().setPageStoreName(new SimpleString("test-topic-hierarchy-root"))); + return configuration; + } + + public WildcardTest(String topicA, String topicB, String topicWildcard) { + super(); + + this.topicA = topicA; + this.topicB = topicB; + this.topicWildcard = topicWildcard; + } + + @Test + public void testWildcard1Topic() throws Exception { + Session sessionA = createSession(); + MessageProducer producerA = createProducer(sessionA, topicA); + + MessageConsumer consumerA = createConsumer(topicA); + MessageConsumer consumerWC = createConsumer(topicWildcard); + + Message message = sessionA.createObjectMessage(1); + producerA.send(message); + + ObjectMessage received1 = (ObjectMessage)consumerA.receive(500); + Assert.assertNotNull(received1); + Assert.assertNotNull(received1.getObject()); + + ObjectMessage received2 = (ObjectMessage)consumerWC.receive(500); + Assert.assertNotNull(received2); + Assert.assertNotNull(received2.getObject()); + + Assert.assertEquals(received1.getJMSMessageID(), received2.getJMSMessageID()); + Assert.assertEquals(received1.getObject(), received2.getObject()); + } + + @Test + public void testWildcard2Topics() throws Exception { + Session sessionA = createSession(); + MessageProducer producerA = createProducer(sessionA, topicA); + + Session sessionB = createSession(); + MessageProducer producerB = createProducer(sessionA, topicB); + + MessageConsumer consumerA = createConsumer(topicA); + MessageConsumer consumerB = createConsumer(topicB); + MessageConsumer consumerWC = createConsumer(topicWildcard); + + Message message1 = sessionA.createObjectMessage(1); + producerA.send(message1); + + Message message2 = sessionB.createObjectMessage(2); + producerB.send(message2); + + ObjectMessage received1 = (ObjectMessage)consumerA.receive(500); + Assert.assertNotNull(received1); + Assert.assertNotNull(received1.getObject()); + + ObjectMessage received2 = (ObjectMessage)consumerB.receive(500); + Assert.assertNotNull(received2); + Assert.assertNotNull(received2.getObject()); + + ObjectMessage received3 = (ObjectMessage)consumerWC.receive(500); + Assert.assertNotNull(received3); + Assert.assertNotNull(received3.getObject()); + + ObjectMessage received4 = (ObjectMessage)consumerWC.receive(500); + Assert.assertNotNull(received4); + Assert.assertNotNull(received4.getObject()); + + Assert.assertEquals(received1.getJMSMessageID(), received3.getJMSMessageID()); + Assert.assertEquals(received1.getObject(), received3.getObject()); + + Assert.assertEquals(received2.getJMSMessageID(), received4.getJMSMessageID()); + Assert.assertEquals(received2.getObject(), received4.getObject()); + } + + @Test + public void testNegativeAddressSizeOnWildcard1() throws Exception { + testNegativeAddressSizeOnWildcard(1); + } + + @Test + public void testNegativeAddressSizeOnWildcard2() throws Exception { + testNegativeAddressSizeOnWildcard(2); + } + + @Test + public void testNegativeAddressSizeOnWildcard10() throws Exception { + testNegativeAddressSizeOnWildcard(10); + } + + @Test + public void testNegativeAddressSizeOnWildcard100() throws Exception { + testNegativeAddressSizeOnWildcard(100); + } + + @Test + public void testNegativeAddressSizeOnWildcardAsync1() throws Exception { + testNegativeAddressSizeOnWildcardAsync(1); + } + + @Test + public void testNegativeAddressSizeOnWildcardAsync2() throws Exception { + testNegativeAddressSizeOnWildcardAsync(2); + } + + @Test + public void testNegativeAddressSizeOnWildcardAsync10() throws Exception { + testNegativeAddressSizeOnWildcardAsync(10); + } + + @Test + public void testNegativeAddressSizeOnWildcardAsync100() throws Exception { + testNegativeAddressSizeOnWildcardAsync(100); + } + + private void testNegativeAddressSizeOnWildcard(int numMessages) throws Exception { + Session sessionA = createSession(); + MessageProducer producerA = createProducer(sessionA, topicA); + + MessageConsumer consumerA = createConsumer(topicA); + MessageConsumer consumerWC = createConsumer(topicWildcard); + + for (int i = 0; i < numMessages; i++) { + Message message = sessionA.createObjectMessage(i); + producerA.send(message); + } + + for (int i = 0; i < numMessages; i++) { + ObjectMessage received1 = (ObjectMessage)consumerA.receive(500); + Assert.assertNotNull("consumerA message - " + i + " is null", received1); + Assert.assertNotNull("consumerA message - " + i + " is null", received1.getObject()); + + ObjectMessage received2 = (ObjectMessage)consumerWC.receive(500); + Assert.assertNotNull("consumerWC message - " + i + " is null", received2); + Assert.assertNotNull("consumerWC message - " + i + " is null", received2.getObject()); + } + + long addressSizeA = (Long)mbeanServer.getAttribute(new ObjectName("org.apache.activemq.artemis:broker=\"localhost\",component=addresses,address=\"" + topicA + "\""), "AddressSize"); + long addressSizeWC = (Long)mbeanServer.getAttribute(new ObjectName("org.apache.activemq.artemis:broker=\"localhost\",component=addresses,address=\"" + topicWildcard + "\""), "AddressSize"); + + Assert.assertTrue(topicA + " AddressSize < 0", addressSizeA >= 0); + Assert.assertTrue(topicWildcard + " AddressSize < 0", addressSizeWC >= 0); + } + + private void testNegativeAddressSizeOnWildcardAsync(int numMessages) throws Exception { + Session sessionA = createSession(); + MessageProducer producerA = createProducer(sessionA, topicA); + + CountDownLatch latchA = new CountDownLatch(numMessages); + MessageConsumer consumerA = createAsyncConsumer(topicA, latchA); + + CountDownLatch latchWC = new CountDownLatch(numMessages); + MessageConsumer consumerWC = createAsyncConsumer(topicWildcard, latchWC); + + for (int i = 0; i < numMessages; i++) { + Message message = sessionA.createObjectMessage(i); + + producerA.send(message); + } + + if (!latchA.await(5, TimeUnit.SECONDS)) { + Assert.fail("Waiting to receive " + latchA.getCount() + " messages on " + topicA); + } + + if (!latchWC.await(5, TimeUnit.SECONDS)) { + Assert.fail("Waiting to receive " + latchWC.getCount() + " messages on " + topicWildcard); + } + + long addressSizeA = (Long)mbeanServer.getAttribute(new ObjectName("org.apache.activemq.artemis:broker=\"localhost\",component=addresses,address=\"" + topicA + "\""), "AddressSize"); + long addressSizeWC = (Long)mbeanServer.getAttribute(new ObjectName("org.apache.activemq.artemis:broker=\"localhost\",component=addresses,address=\"" + topicWildcard + "\""), "AddressSize"); + + Assert.assertTrue(topicA + " AddressSize < 0", addressSizeA >= 0); + Assert.assertTrue(topicWildcard + " AddressSize < 0", addressSizeWC >= 0); + } + + private Session createSession() throws Exception { + Connection connection = createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + return session; + } + + private MessageProducer createProducer(Session session, String topicName) throws Exception { + Topic topic = session.createTopic(topicName); + + MessageProducer producer = session.createProducer(topic); + producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + + return producer; + } + + private MessageConsumer createConsumer(String topicName) throws Exception { + Connection connection = createConnection(); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Topic topic = session.createTopic(topicName); + + MessageConsumer consumer = session.createConsumer(topic, null, false); + + return consumer; + } + + private MessageConsumer createAsyncConsumer(String topicName, CountDownLatch latch) throws Exception { + MessageConsumer consumer = createConsumer(topicName); + consumer.setMessageListener(m -> { + try { + latch.countDown(); + } catch (Throwable ex) { + ex.printStackTrace(); + } + }); + + return consumer; + } +} \ No newline at end of file diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MqttWildCardSubAutoCreateTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MqttWildCardSubAutoCreateTest.java new file mode 100644 index 0000000000..03a66291ae --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MqttWildCardSubAutoCreateTest.java @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.mqtt; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.Topic; +import java.util.LinkedList; + +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient; +import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; +import org.apache.activemq.artemis.tests.integration.mqtt.imported.MQTTTestSupport; +import org.apache.activemq.artemis.tests.util.Wait; +import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; +import org.eclipse.paho.client.mqttv3.MqttCallback; +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; +import org.jgroups.util.UUID; +import org.junit.After; +import org.junit.Assert; +import org.junit.Test; + + +public class MqttWildCardSubAutoCreateTest extends MQTTTestSupport { + + private int lastId; + private MqttClient subscriber; + private MqttClient sender; + private volatile LinkedList topics = new LinkedList<>(); + + @After + public void clean() throws MqttException { + topics.clear(); + if (subscriber != null && subscriber.isConnected()) { + subscriber.disconnect(); + subscriber.close(); + } + if (sender != null && sender.isConnected()) { + sender.disconnect(); + sender.close(); + } + } + + @Override + protected ActiveMQServer createServer(final boolean realFiles, final Configuration configuration) { + configuration.getAddressesSettings().remove("#"); + configuration.getAddressesSettings().put("#", new AddressSettings().setPageSizeBytes(5).setMaxSizeBytes(10).setPageStoreName(new SimpleString("news-bag"))); + configuration.setGlobalMaxSize(15); + return createServer(realFiles, configuration, AddressSettings.DEFAULT_PAGE_SIZE, 10); + } + + @Test + public void testWildcardSubAutoCreateDoesNotPageToWildcardAddress() throws Exception { + + server.getManagementService().enableNotifications(false); + + String subscriberId = UUID.randomUUID().toString(); + String senderId = UUID.randomUUID().toString(); + String subscribeTo = "A.*"; + String publishTo = "A.a"; + + subscriber = createMqttClient(subscriberId); + subscriber.subscribe(subscribeTo, 2); + + subscriber.disconnect(); + + sender = createMqttClient(senderId); + sender.publish(publishTo, UUID.randomUUID().toString().getBytes(), 2, false); + sender.publish(publishTo, UUID.randomUUID().toString().getBytes(), 2, false); + + assertTrue(server.getPagingManager().getPageStore(new SimpleString(subscribeTo)).isPaging()); + + subscriber = createMqttClient(subscriberId); + subscriber.subscribe(subscribeTo, 2); + + boolean satisfied = Wait.waitFor(() -> topics.size() == 2, 5_000); + if (!satisfied) { + Assert.fail(); + } + + subscriber.messageArrivedComplete(lastId, 2); + subscriber.disconnect(); + subscriber.close(); + + for (String topic : topics) { + assertEquals("A/a", topic); + } + + } + + private MqttClient createMqttClient(String clientId) throws MqttException { + MqttClient client = new MqttClient("tcp://localhost:" + getPort(), clientId, new MemoryPersistence()); + client.setCallback(createCallback()); + client.setManualAcks(true); + MqttConnectOptions options = new MqttConnectOptions(); + options.setCleanSession(false); + client.connect(options); + return client; + } + + private MqttCallback createCallback() { + return new MqttCallback() { + + @Override + public void messageArrived(String topic, MqttMessage message) throws Exception { + topics.add(topic); + lastId = message.getId(); + } + + @Override + public void deliveryComplete(IMqttDeliveryToken token) { + } + + @Override + public void connectionLost(Throwable cause) { + } + }; + } + + @Test + public void testCoreHierarchicalTopic() throws Exception { + ConnectionFactory cf = new ActiveMQConnectionFactory(); + + Connection connection = cf.createConnection(); + connection.setClientID("CLI-ID"); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + Topic topicSubscribe = ActiveMQJMSClient.createTopic("news.europe.#"); + + MessageConsumer messageConsumer = session.createDurableConsumer(topicSubscribe, "news-eu"); + + MessageProducer producer = session.createProducer(null); + + Topic topicNewsUsaWrestling = ActiveMQJMSClient.createTopic("news.usa.wrestling"); + Topic topicNewsEuropeSport = ActiveMQJMSClient.createTopic("news.europe.sport"); + Topic topicNewsEuropeEntertainment = ActiveMQJMSClient.createTopic("news.europe.entertainment"); + + TextMessage messageWrestlingNews = session.createTextMessage("Hulk Hogan starts ballet classes"); + addSizeProp(messageWrestlingNews); + producer.send(topicNewsUsaWrestling, messageWrestlingNews); + + TextMessage messageEuropeSport = session.createTextMessage("Lewis Hamilton joins European synchronized swimming team"); + producer.send(topicNewsEuropeSport, messageEuropeSport); + + TextMessage messageEuropeEntertainment = session.createTextMessage("John Lennon resurrected from dead"); + producer.send(topicNewsEuropeEntertainment, messageEuropeEntertainment); + + connection.start(); + + // second consumer to page to different address + Topic topicSubscribeAllNews = ActiveMQJMSClient.createTopic("news.#"); + + MessageConsumer messageConsumerAllNews = session.createDurableConsumer(topicSubscribeAllNews, "news-all"); + + producer.send(topicNewsUsaWrestling, messageWrestlingNews); + producer.send(topicNewsEuropeEntertainment, messageEuropeEntertainment); + + MessageConsumer messageConsumerEuEnt = session.createDurableConsumer(topicNewsEuropeEntertainment, "news-eu-ent"); + + producer.send(topicNewsUsaWrestling, messageWrestlingNews); + producer.send(topicNewsEuropeEntertainment, messageEuropeEntertainment); + + System.out.println("Usage " + server.getPagingManager().getGlobalSize()); + + TextMessage msg = (TextMessage) messageConsumerAllNews.receive(5000); + + System.out.println("1 All received message: " + msg.getText() + ", dest: " + msg.getJMSDestination()); + + msg = (TextMessage) messageConsumerAllNews.receive(5000); + + System.out.println("2 All received message: " + msg.getText() + ", dest: " + msg.getJMSDestination()); + + msg = (TextMessage) messageConsumerEuEnt.receive(5000); + + System.out.println("3 EuEnt received message: " + msg.getText() + ", dest: " + msg.getJMSDestination()); + + TextMessage messageReceived1 = (TextMessage) messageConsumer.receive(5000); + + System.out.println("4 Received message: " + messageReceived1.getText() + ", dest: " + messageReceived1.getJMSDestination()); + + TextMessage messageReceived2 = (TextMessage) messageConsumer.receive(5000); + + System.out.println("5 Received message: " + messageReceived2.getText() + ", dest: " + messageReceived2.getJMSDestination()); + + // verify messageConsumer gets messageEuropeEntertainment + msg = (TextMessage) messageConsumer.receive(5000); + + System.out.println("6 Eu received message: " + msg.getText() + ", dest: " + msg.getJMSDestination()); + + assertEquals(topicNewsEuropeSport, messageReceived1.getJMSDestination()); + assertEquals(topicNewsEuropeEntertainment, messageReceived2.getJMSDestination()); + assertEquals(topicNewsEuropeEntertainment, msg.getJMSDestination()); + + messageConsumer.close(); + messageConsumerAllNews.close(); + + int countOfPageStores = server.getPagingManager().getStoreNames().length; + assertEquals("there should only be one", 1, countOfPageStores); + + connection.close(); + } + + private void addSizeProp(TextMessage messageWrestlingNews) throws JMSException { + messageWrestlingNews.setStringProperty("stuff", new String(new byte[1024])); + } + +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java index 30665114b1..8972dae48a 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java @@ -233,7 +233,7 @@ public class MQTTTestSupport extends ActiveMQTestBase { TransportConfiguration transportConfiguration = new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params); server.getConfiguration().getAcceptorConfigurations().add(transportConfiguration); - log.debugv("Added connector {} to broker", getProtocolScheme()); + log.debug("Added CORE connector to broker"); } protected void addMQTTConnector() throws Exception { @@ -243,7 +243,7 @@ public class MQTTTestSupport extends ActiveMQTestBase { server.getConfiguration().addAcceptorConfiguration("MQTT", "tcp://localhost:" + port + "?protocols=MQTT;anycastPrefix=anycast:;multicastPrefix=multicast:"); - log.debugv("Added connector {} to broker", getProtocolScheme()); + log.debug("Added MQTT connector to broker"); } public void stopBroker() throws Exception { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingSizeWildcardTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingSizeWildcardTest.java new file mode 100644 index 0000000000..a1e64c60c3 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingSizeWildcardTest.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.paging; + +import javax.jms.Connection; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.Topic; +import java.util.Date; + +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.TransportConfiguration; +import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient; +import org.apache.activemq.artemis.api.jms.JMSFactoryType; +import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.jms.client.ActiveMQJMSConnectionFactory; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.junit.Test; + +public class PagingSizeWildcardTest extends ActiveMQTestBase { + + @Test + public void testWildcardPageSize() throws Exception { + + Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false); + config.getAddressesSettings().put("A.#", new AddressSettings().setPageStoreName(new SimpleString("shared-page-store-for-a#"))); + + ActiveMQServer server = createServer(true, config, 200, 400); + server.start(); + + ActiveMQJMSConnectionFactory cf = (ActiveMQJMSConnectionFactory) ActiveMQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, new TransportConfiguration(INVM_CONNECTOR_FACTORY)); + + try { + Connection conn = cf.createConnection(); + conn.start(); + Session sessA = conn.createSession(true, Session.SESSION_TRANSACTED); + Topic subA = sessA.createTopic("A.a"); + MessageConsumer consumerA = sessA.createConsumer(subA); + + Session sessW = conn.createSession(true, Session.SESSION_TRANSACTED); + Topic subW = sessA.createTopic("A.#"); + MessageConsumer consumerW = sessW.createConsumer(subW); + + final int numMessages = 5; + publish(cf, numMessages); + + for (int i = 0; i < numMessages; i++) { + assertNotNull(" on " + i, consumerA.receive(1000)); + assertNotNull(" on " + i, consumerW.receive(1000)); + } + + // commit in reverse order to dispatch + sessW.commit(); + sessA.commit(); + + for (SimpleString psName : server.getPagingManager().getStoreNames()) { + assertTrue("non negative size: " + psName, server.getPagingManager().getPageStore(psName).getAddressSize() >= 0); + } + conn.close(); + + } finally { + server.stop(); + } + } + + + @Test + public void testDurableSubReveresOrderAckPageSize() throws Exception { + + Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false); + + ActiveMQServer server = createServer(true, config, 200, 400); + server.start(); + + ActiveMQJMSConnectionFactory cf = (ActiveMQJMSConnectionFactory) ActiveMQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, new TransportConfiguration(INVM_CONNECTOR_FACTORY)); + + try { + Connection conn = cf.createConnection(); + conn.setClientID("IDD"); + conn.start(); + + Session sessA = conn.createSession(true, Session.SESSION_TRANSACTED); + Topic topic = sessA.createTopic("A.a"); + MessageConsumer consumerA = sessA.createDurableConsumer(topic, "1"); + + Session sessW = conn.createSession(true, Session.SESSION_TRANSACTED); + MessageConsumer consumerW = sessW.createDurableConsumer(topic, "2"); + + final int numMessages = 5; + publish(cf, numMessages); + + for (int i = 0; i < numMessages; i++) { + assertNotNull(" on " + i, consumerA.receive(1000)); + assertNotNull(" on " + i, consumerW.receive(1000)); + } + + // commit in reverse order to dispatch + sessW.commit(); + sessA.commit(); + + for (SimpleString psName : server.getPagingManager().getStoreNames()) { + assertTrue("non negative size: " + psName, server.getPagingManager().getPageStore(psName).getAddressSize() >= 0); + } + conn.close(); + + } finally { + server.stop(); + } + } + + private void publish(ActiveMQJMSConnectionFactory cf, int numMessages) throws Exception { + Connection conn = cf.createConnection(); + conn.start(); + Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + Topic subA = sess.createTopic("A.a"); + MessageProducer messageProducer = sess.createProducer(subA); + + for (int i = 0; i < numMessages; i++) { + messageProducer.send(sess.createTextMessage(new Date().toString())); + } + conn.close(); + } + +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java index bbf96abc61..644adff791 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java @@ -6591,6 +6591,92 @@ public class PagingTest extends ActiveMQTestBase { server.stop(); } + @Test + public void testHierarchicalPagingStoreNotDestroyed() throws Exception { + clearDataRecreateServerDirs(); + + final SimpleString pageAddress = new SimpleString("A.#"); + Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false); + config.getAddressesSettings().put("A.#", new AddressSettings().setPageStoreName(pageAddress)); + + server = createServer(true, config, 100, 500); + + server.start(); + + final int numberOfMessages = 10; + final int messageSize = 100; + + locator = createInVMNonHALocator().setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true); + + sf = createSessionFactory(locator); + + ClientSession session = sf.createSession(false, false, false); + + final SimpleString addressA = new SimpleString("A.a.#"); + session.createQueue(new QueueConfiguration(addressA)); + + final SimpleString addressB = new SimpleString("A.b.#"); + session.createQueue(new QueueConfiguration(addressB)); + + final SimpleString produceAddressA = new SimpleString("A.a.a"); + ClientProducer producerA = session.createProducer(produceAddressA); + + final SimpleString produceAddressB = new SimpleString("A.b.a"); + ClientProducer producerB = session.createProducer(produceAddressB); + + ClientMessage message = null; + + byte[] body = new byte[messageSize]; + + ByteBuffer bb = ByteBuffer.wrap(body); + + for (int j = 1; j <= messageSize; j++) { + bb.put(getSamplebyte(j)); + } + + for (int i = 0; i < numberOfMessages; i++) { + message = session.createMessage(true); + + ActiveMQBuffer bodyLocal = message.getBodyBuffer(); + + bodyLocal.writeBytes(body); + + producerA.send(message); + producerB.send(message); + session.commit(); + } + session.commit(); + producerA.close(); + producerB.close(); + + assertTrue(Arrays.asList(server.getPagingManager().getStoreNames()).contains(pageAddress)); + assertTrue(server.getPagingManager().getPageStore(pageAddress).isPaging()); + + session.deleteQueue(addressA); + session.deleteQueue(addressB); + + session.close(); + + System.err.println("storeNames: " + Arrays.asList(server.getPagingManager().getStoreNames())); + + server.getPagingManager().deletePageStore(produceAddressA); + server.getPagingManager().deletePageStore(produceAddressB); + + sf.close(); + locator.close(); + locator = null; + sf = null; + assertTrue(Arrays.asList(server.getPagingManager().getStoreNames()).contains(pageAddress)); + // Ensure wildcard store is still there + server.getPagingManager().reloadStores(); + assertTrue(Arrays.asList(server.getPagingManager().getStoreNames()).contains(pageAddress)); + server.stop(); + + server.start(); + assertTrue(Arrays.asList(server.getPagingManager().getStoreNames()).contains(pageAddress)); + server.stop(); + } + @Test public void testStopPagingWithoutConsumersIfTwoPages() throws Exception { testStopPagingWithoutConsumersOnOneQueue(true);