From c5f0e3400ccad096350bbd8e6681563177331956 Mon Sep 17 00:00:00 2001 From: Mike Artz Date: Tue, 25 Jul 2023 16:43:21 -0500 Subject: [PATCH] ARTEMIS-4159 Support duplicate cache size configuration per address This commit introduces support for configuring a specific Duplicate ID cache size per address in the Artemis server. Previously, there was only a global setting for the ID cache size, but now each address can have its own cache size. The changes include the addition of a new configuration property id-cache-size in the Artemis server configuration file. This property can now be specified under each address setting in the configuration file, and its value will determine the Duplicate ID cache size for that particular address. If the id-cache-size property is not specified for an address, it will use the global setting. The test cases have been updated to cover this new functionality, and integration test have been added to verify that address-specific cache sizes work as expected. Documentation has been added to address-settings.adoc, configuration-index.adoc and duplicate-detection.adoc --- .../impl/FileConfigurationParser.java | 6 ++ .../core/postoffice/impl/PostOfficeImpl.java | 8 +- .../core/settings/impl/AddressSettings.java | 32 ++++++++ .../schema/artemis-configuration.xsd | 9 ++ .../config/impl/ConfigurationImplTest.java | 2 + .../config/impl/FileConfigurationTest.java | 2 + .../core/settings/AddressSettingsTest.java | 2 + .../ConfigurationTest-full-config.xml | 1 + ...nTest-xinclude-config-address-settings.xml | 1 + ...include-schema-config-address-settings.xml | 1 + docs/user-manual/address-settings.adoc | 7 ++ docs/user-manual/configuration-index.adoc | 4 + docs/user-manual/duplicate-detection.adoc | 7 ++ .../integration/DuplicateDetectionTest.java | 82 +++++++++++++++++++ 14 files changed, 163 insertions(+), 1 deletion(-) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java index b7d28d29a5..e9eb2874ce 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java @@ -342,6 +342,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { private static final String ENABLE_INGRESS_TIMESTAMP = "enable-ingress-timestamp"; + private static final String ID_CACHE_SIZE = "id-cache-size"; + private boolean validateAIO = false; private boolean printPageMaxSizeUsed = false; @@ -1456,6 +1458,10 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { addressSettings.setEnableMetrics(XMLUtil.parseBoolean(child)); } else if (ENABLE_INGRESS_TIMESTAMP.equalsIgnoreCase(name)) { addressSettings.setEnableIngressTimestamp(XMLUtil.parseBoolean(child)); + } else if (ID_CACHE_SIZE.equalsIgnoreCase(name)) { + int idCacheSize = XMLUtil.parseInt(child); + Validators.GE_ZERO.validate(ID_CACHE_SIZE, idCacheSize); + addressSettings.setIDCacheSize(XMLUtil.parseInt(child)); } } return setting; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java index 7b7e68c336..738fbef3f6 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java @@ -1407,9 +1407,15 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding return null; } + private int resolveIdCacheSize(SimpleString address) { + final AddressSettings addressSettings = addressSettingsRepository.getMatch(address.toString()); + return addressSettings.getIDCacheSize() == null ? idCacheSize : addressSettings.getIDCacheSize(); + } + @Override public DuplicateIDCache getDuplicateIDCache(final SimpleString address) { - return getDuplicateIDCache(address, idCacheSize); + int resolvedIdCacheSize = resolveIdCacheSize(address); + return getDuplicateIDCache(address, resolvedIdCacheSize); } @Override diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java index 202b082902..aee8c2fe9b 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java @@ -293,6 +293,8 @@ public class AddressSettings implements Mergeable, Serializable private Boolean enableIngressTimestamp = null; + private Integer idCacheSize = null; + //from amq5 //make it transient private transient Integer queuePrefetch = null; @@ -370,6 +372,7 @@ public class AddressSettings implements Mergeable, Serializable this.managementMessageAttributeSizeLimit = other.managementMessageAttributeSizeLimit; this.slowConsumerThresholdMeasurementUnit = other.slowConsumerThresholdMeasurementUnit; this.enableIngressTimestamp = other.enableIngressTimestamp; + this.idCacheSize = other.idCacheSize; } public AddressSettings() { @@ -1073,6 +1076,15 @@ public class AddressSettings implements Mergeable, Serializable return this; } + public Integer getIDCacheSize() { + return idCacheSize; + } + + public AddressSettings setIDCacheSize(int idCacheSize) { + this.idCacheSize = idCacheSize; + return this; + } + /** * merge 2 objects in to 1 * @@ -1302,6 +1314,9 @@ public class AddressSettings implements Mergeable, Serializable if (pageLimitMessages == null) { pageLimitMessages = merged.pageLimitMessages; } + if (idCacheSize == null) { + idCacheSize = merged.idCacheSize; + } } @Override @@ -1577,6 +1592,10 @@ public class AddressSettings implements Mergeable, Serializable if (buffer.readableBytes() > 0) { autoDeleteAddressesSkipUsageCheck = BufferHelper.readNullableBoolean(buffer); } + + if (buffer.readableBytes() > 0) { + idCacheSize = BufferHelper.readNullableInteger(buffer); + } } @Override @@ -1652,6 +1671,7 @@ public class AddressSettings implements Mergeable, Serializable BufferHelper.sizeOfNullableInteger(maxReadPageBytes) + BufferHelper.sizeOfNullableLong(pageLimitBytes) + BufferHelper.sizeOfNullableLong(pageLimitMessages) + + BufferHelper.sizeOfNullableInteger(idCacheSize) + BufferHelper.sizeOfNullableSimpleString(pageFullMessagePolicy != null ? pageFullMessagePolicy.toString() : null); } @@ -1802,6 +1822,8 @@ public class AddressSettings implements Mergeable, Serializable BufferHelper.writeNullableBoolean(buffer, autoDeleteQueuesSkipUsageCheck); BufferHelper.writeNullableBoolean(buffer, autoDeleteAddressesSkipUsageCheck); + + BufferHelper.writeNullableInteger(buffer, idCacheSize); } /* (non-Javadoc) @@ -1883,6 +1905,7 @@ public class AddressSettings implements Mergeable, Serializable result = prime * result + ((pageLimitBytes == null) ? 0 : pageLimitBytes.hashCode()); result = prime * result + ((pageLimitMessages == null) ? 0 : pageLimitMessages.hashCode()); result = prime * result + ((pageFullMessagePolicy == null) ? 0 : pageFullMessagePolicy.hashCode()); + result = prime * result + ((idCacheSize == null) ? 0 : idCacheSize.hashCode()); return result; } @@ -2290,6 +2313,13 @@ public class AddressSettings implements Mergeable, Serializable return false; } + if (idCacheSize == null) { + if (other.idCacheSize != null) { + return false; + } + } else if (!idCacheSize.equals(other.idCacheSize)) { + return false; + } return true; } @@ -2438,6 +2468,8 @@ public class AddressSettings implements Mergeable, Serializable pageLimitMessages + ", pageFullMessagePolicy=" + pageFullMessagePolicy + + ", idCacheSize=" + + idCacheSize + "]"; } } diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd index 5b23f0ffff..37144f8ba4 100644 --- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd +++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd @@ -4281,6 +4281,15 @@ + + + + This will set the Duplicate ID cache size for the matching address. By default the global + setting for `id-cache-size` will be used. + + + + diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java index 5c71d0cece..5950b9d0c3 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java @@ -1120,6 +1120,7 @@ public class ConfigurationImplTest extends ActiveMQTestBase { properties.put("addressSettings.NeedToSet.autoDeleteCreatedQueues", "true"); properties.put("addressSettings.NeedToSet.defaultExclusiveQueue", "true"); properties.put("addressSettings.NeedToSet.defaultMaxConsumers", 10); + properties.put("addressSettings.NeedToSet.iDCacheSize", 10); configuration.parsePrefixedProperties(properties, null); @@ -1185,6 +1186,7 @@ public class ConfigurationImplTest extends ActiveMQTestBase { Assert.assertTrue(configuration.getAddressSettings().get("NeedToSet").isAutoDeleteCreatedQueues()); Assert.assertTrue(configuration.getAddressSettings().get("NeedToSet").isDefaultExclusiveQueue()); Assert.assertEquals(Integer.valueOf(10), configuration.getAddressSettings().get("NeedToSet").getDefaultMaxConsumers()); + Assert.assertEquals(Integer.valueOf(10), configuration.getAddressSettings().get("NeedToSet").getIDCacheSize()); } @Test diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java index ddbac90aca..ee795ea505 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java @@ -461,6 +461,7 @@ public class FileConfigurationTest extends ConfigurationImplTest { assertEquals(0, conf.getAddressSettings().get("a1").getRetroactiveMessageCount()); assertTrue(conf.getAddressSettings().get("a1").isEnableMetrics()); assertTrue(conf.getAddressSettings().get("a1").isEnableIngressTimestamp()); + assertEquals(null, conf.getAddressSettings().get("a1").getIDCacheSize()); assertEquals("a2.1", conf.getAddressSettings().get("a2").getDeadLetterAddress().toString()); assertEquals(true, conf.getAddressSettings().get("a2").isAutoCreateDeadLetterResources()); @@ -500,6 +501,7 @@ public class FileConfigurationTest extends ConfigurationImplTest { assertEquals(10, conf.getAddressSettings().get("a2").getRetroactiveMessageCount()); assertFalse(conf.getAddressSettings().get("a2").isEnableMetrics()); assertFalse(conf.getAddressSettings().get("a2").isEnableIngressTimestamp()); + assertEquals(Integer.valueOf(500), conf.getAddressSettings().get("a2").getIDCacheSize()); assertTrue(conf.getResourceLimitSettings().containsKey("myUser")); assertEquals(104, conf.getResourceLimitSettings().get("myUser").getMaxConnections()); diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/settings/AddressSettingsTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/settings/AddressSettingsTest.java index 73a936e010..d3fed0d55b 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/settings/AddressSettingsTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/settings/AddressSettingsTest.java @@ -74,6 +74,7 @@ public class AddressSettingsTest extends ActiveMQTestBase { addressSettingsToMerge.setExpiryDelay(999L); addressSettingsToMerge.setMinExpiryDelay(888L); addressSettingsToMerge.setMaxExpiryDelay(777L); + addressSettingsToMerge.setIDCacheSize(5); addressSettings.merge(addressSettingsToMerge); Assert.assertEquals(addressSettings.getDeadLetterAddress(), DLQ); @@ -90,6 +91,7 @@ public class AddressSettingsTest extends ActiveMQTestBase { Assert.assertEquals(Long.valueOf(999), addressSettings.getExpiryDelay()); Assert.assertEquals(Long.valueOf(888), addressSettings.getMinExpiryDelay()); Assert.assertEquals(Long.valueOf(777), addressSettings.getMaxExpiryDelay()); + Assert.assertEquals(Integer.valueOf(5), addressSettings.getIDCacheSize()); } @Test diff --git a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml index 72261652c0..f57c05e128 100644 --- a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml +++ b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml @@ -572,6 +572,7 @@ false 400 265 + 500 diff --git a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config-address-settings.xml b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config-address-settings.xml index ef34e4836e..091a8f4f17 100644 --- a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config-address-settings.xml +++ b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config-address-settings.xml @@ -83,5 +83,6 @@ 10000 10 false + 500 \ No newline at end of file diff --git a/artemis-server/src/test/resources/ConfigurationTest-xinclude-schema-config-address-settings.xml b/artemis-server/src/test/resources/ConfigurationTest-xinclude-schema-config-address-settings.xml index ef34e4836e..091a8f4f17 100644 --- a/artemis-server/src/test/resources/ConfigurationTest-xinclude-schema-config-address-settings.xml +++ b/artemis-server/src/test/resources/ConfigurationTest-xinclude-schema-config-address-settings.xml @@ -83,5 +83,6 @@ 10000 10 false + 500 \ No newline at end of file diff --git a/docs/user-manual/address-settings.adoc b/docs/user-manual/address-settings.adoc index da51ac8b43..bb7bba0938 100644 --- a/docs/user-manual/address-settings.adoc +++ b/docs/user-manual/address-settings.adoc @@ -76,6 +76,7 @@ Here an example of an `address-setting` entry that might be found in the `broker 0 true false + 500 ---- @@ -371,3 +372,9 @@ For core messages (used by the core and OpenWire protocols) the broker will add For STOMP messages the broker will add a frame header named `ingress-timestamp`. The value will be the number of milliseconds since the https://en.wikipedia.org/wiki/Unix_time[epoch]. Default is `false`. + +id-cache-size:: +defines the maximum size of the duplicate ID cache for an address, as each address has it's own cache +that helps to detect and prevent the processing of duplicate messages based on their unique identification. +By default, the `id-cache-size` setting inherits from the global `id-cache-size`, with a default of `20000` +elements if not explicitly configured. Read more about xref:duplicate-detection.adoc#configuring-the-duplicate-id-cache[duplicate id cache sizes]. \ No newline at end of file diff --git a/docs/user-manual/configuration-index.adoc b/docs/user-manual/configuration-index.adoc index 703f36dcf3..d3a2fcbbbd 100644 --- a/docs/user-manual/configuration-index.adoc +++ b/docs/user-manual/configuration-index.adoc @@ -808,6 +808,10 @@ see `auto-create-queues` & `auto-create-addresses` | xref:retroactive-addresses.adoc#retroactive-addresses[retroactive-message-count] | the number of messages to preserve for future queues created on the matching address | `0` + +| xref:duplicate-detection.adoc#configuring-the-duplicate-id-cache[id-cache-size] +| The duplicate detection circular cache size +| Inherits from global `id-cache-size` |=== == bridge type diff --git a/docs/user-manual/duplicate-detection.adoc b/docs/user-manual/duplicate-detection.adoc index 24e7fc333b..2f8eb79f0e 100644 --- a/docs/user-manual/duplicate-detection.adoc +++ b/docs/user-manual/duplicate-detection.adoc @@ -82,6 +82,13 @@ If the cache has a maximum size of `n` elements, then the ``n + 1``th id stored The maximum size of the cache is configured by the parameter `id-cache-size` in `broker.xml`, the default value is `20000` elements. +To implement an address-specific `id-cache-size`, you can add to the +corresponding address-settings section in `broker.xml`. Specify the +desired `id-cache-size` value for the particular address. When a message +is sent to an address with a specific `id-cache-size` configured, it +will take precedence over the global `id-cache-size` value, allowing +for greater flexibility and optimization of duplicate ID caching. + The caches can also be configured to persist to disk or not. This is configured by the parameter `persist-id-cache`, also in `broker.xml`. If this is set to `true` then each id will be persisted to permanent storage as they are received. diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/DuplicateDetectionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/DuplicateDetectionTest.java index 5f09dc6f95..82ce10d068 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/DuplicateDetectionTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/DuplicateDetectionTest.java @@ -39,6 +39,7 @@ import org.apache.activemq.artemis.api.core.client.ServerLocator; import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl; import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.transaction.impl.XidImpl; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.utils.UUIDGenerator; @@ -576,6 +577,87 @@ public class DuplicateDetectionTest extends ActiveMQTestBase { } } + @Test + public void testDuplicateIDCacheSizeForAddressSpecificSetting() throws Exception { + server.stop(); + + final int addressIdCacheSize = 1; + final int globalIdCacheSize = 2; + final SimpleString dupIDOne = new SimpleString("1"); + final SimpleString dupIDTwo = new SimpleString("2"); + final SimpleString globalSettingsQueueName = new SimpleString("GlobalIdCacheSizeQueue"); + final SimpleString addressSettingsQueueName = new SimpleString("AddressIdCacheSizeQueue"); + AddressSettings testAddressSettings = new AddressSettings(); + testAddressSettings.setIDCacheSize(addressIdCacheSize); + + config = createDefaultInVMConfig().setIDCacheSize(globalIdCacheSize); + config.getAddressSettings().put(addressSettingsQueueName.toString(), testAddressSettings); + + server = createServer(config); + server.start(); + sf = createSessionFactory(locator); + ClientSession session = sf.createSession(false, true, true); + session.start(); + + session.createQueue(new QueueConfiguration(globalSettingsQueueName).setDurable(false)); + session.createQueue(new QueueConfiguration(addressSettingsQueueName).setDurable(false)); + + ClientProducer addressSettingsProducer = session.createProducer(addressSettingsQueueName); + ClientConsumer addressSettingsConsumer = session.createConsumer(addressSettingsQueueName); + ClientProducer globalSettingsProducer = session.createProducer(globalSettingsQueueName); + ClientConsumer globalSettingsConsumer = session.createConsumer(globalSettingsQueueName); + + + ClientMessage globalSettingsMessage1 = createMessage(session, 1); + globalSettingsMessage1.putBytesProperty(Message.HDR_DUPLICATE_DETECTION_ID, dupIDOne.getData()); + globalSettingsProducer.send(globalSettingsMessage1); + + ClientMessage globalSettingsMessage2 = createMessage(session, 2); + globalSettingsMessage2.putBytesProperty(Message.HDR_DUPLICATE_DETECTION_ID, dupIDTwo.getData()); + globalSettingsProducer.send(globalSettingsMessage2); + + // globalSettingsMessage3 will be ignored by the server - dupIDOne was only 2 messages ago + ClientMessage globalSettingsMessage3 = createMessage(session, 3); + globalSettingsMessage3.putBytesProperty(Message.HDR_DUPLICATE_DETECTION_ID, dupIDOne.getData()); + globalSettingsProducer.send(globalSettingsMessage3); + + globalSettingsMessage1 = globalSettingsConsumer.receive(1000); + Assert.assertEquals(1, globalSettingsMessage1.getObjectProperty(propKey)); + + globalSettingsMessage2 = globalSettingsConsumer.receive(1000); + Assert.assertEquals(2, globalSettingsMessage2.getObjectProperty(propKey)); + + // globalSettingsMessage3 will be ignored by the server because dupIDOne is duplicate + globalSettingsMessage3 = globalSettingsConsumer.receiveImmediate(); + Assert.assertNull(globalSettingsMessage3); + + ClientMessage addressSettingsMessage1 = createMessage(session, 1); + addressSettingsMessage1.putBytesProperty(Message.HDR_DUPLICATE_DETECTION_ID, dupIDOne.getData()); + addressSettingsProducer.send(addressSettingsMessage1); + + ClientMessage addressSettingsMessage2 = createMessage(session, 2); + addressSettingsMessage2.putBytesProperty(Message.HDR_DUPLICATE_DETECTION_ID, dupIDTwo.getData()); + addressSettingsProducer.send(addressSettingsMessage2); + + // addressSettingsMessage3 will not be ignored because the id-cache-size is only 1 + // and dupOne was 2 messages ago + ClientMessage addressSettingsMessage3 = createMessage(session, 3); + addressSettingsMessage3.putBytesProperty(Message.HDR_DUPLICATE_DETECTION_ID, dupIDOne.getData()); + addressSettingsProducer.send(addressSettingsMessage3); + + addressSettingsMessage1 = addressSettingsConsumer.receive(1000); + Assert.assertEquals(1, addressSettingsMessage1.getObjectProperty(propKey)); + + addressSettingsMessage2 = addressSettingsConsumer.receive(1000); + Assert.assertEquals(2, addressSettingsMessage2.getObjectProperty(propKey)); + + // addressSettingsMessage3 will be acked successfully by addressSettingsConsumer + // because the id-cache-size is only 1 (instead of the global size of 2) + addressSettingsMessage3 = addressSettingsConsumer.receive(1000); + Assert.assertEquals(3, addressSettingsMessage3.getObjectProperty(propKey)); + + session.commit(); + } @Test public void testTransactedDuplicateDetection1() throws Exception { ClientSession session = sf.createSession(false, false, false);