diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java index b7d28d29a5..e9eb2874ce 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java @@ -342,6 +342,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { private static final String ENABLE_INGRESS_TIMESTAMP = "enable-ingress-timestamp"; + private static final String ID_CACHE_SIZE = "id-cache-size"; + private boolean validateAIO = false; private boolean printPageMaxSizeUsed = false; @@ -1456,6 +1458,10 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { addressSettings.setEnableMetrics(XMLUtil.parseBoolean(child)); } else if (ENABLE_INGRESS_TIMESTAMP.equalsIgnoreCase(name)) { addressSettings.setEnableIngressTimestamp(XMLUtil.parseBoolean(child)); + } else if (ID_CACHE_SIZE.equalsIgnoreCase(name)) { + int idCacheSize = XMLUtil.parseInt(child); + Validators.GE_ZERO.validate(ID_CACHE_SIZE, idCacheSize); + addressSettings.setIDCacheSize(XMLUtil.parseInt(child)); } } return setting; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java index 7b7e68c336..738fbef3f6 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java @@ -1407,9 +1407,15 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding return null; } + private int resolveIdCacheSize(SimpleString address) { + final AddressSettings addressSettings = addressSettingsRepository.getMatch(address.toString()); + return addressSettings.getIDCacheSize() == null ? idCacheSize : addressSettings.getIDCacheSize(); + } + @Override public DuplicateIDCache getDuplicateIDCache(final SimpleString address) { - return getDuplicateIDCache(address, idCacheSize); + int resolvedIdCacheSize = resolveIdCacheSize(address); + return getDuplicateIDCache(address, resolvedIdCacheSize); } @Override diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java index 202b082902..aee8c2fe9b 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java @@ -293,6 +293,8 @@ public class AddressSettings implements Mergeable, Serializable private Boolean enableIngressTimestamp = null; + private Integer idCacheSize = null; + //from amq5 //make it transient private transient Integer queuePrefetch = null; @@ -370,6 +372,7 @@ public class AddressSettings implements Mergeable, Serializable this.managementMessageAttributeSizeLimit = other.managementMessageAttributeSizeLimit; this.slowConsumerThresholdMeasurementUnit = other.slowConsumerThresholdMeasurementUnit; this.enableIngressTimestamp = other.enableIngressTimestamp; + this.idCacheSize = other.idCacheSize; } public AddressSettings() { @@ -1073,6 +1076,15 @@ public class AddressSettings implements Mergeable, Serializable return this; } + public Integer getIDCacheSize() { + return idCacheSize; + } + + public AddressSettings setIDCacheSize(int idCacheSize) { + this.idCacheSize = idCacheSize; + return this; + } + /** * merge 2 objects in to 1 * @@ -1302,6 +1314,9 @@ public class AddressSettings implements Mergeable, Serializable if (pageLimitMessages == null) { pageLimitMessages = merged.pageLimitMessages; } + if (idCacheSize == null) { + idCacheSize = merged.idCacheSize; + } } @Override @@ -1577,6 +1592,10 @@ public class AddressSettings implements Mergeable, Serializable if (buffer.readableBytes() > 0) { autoDeleteAddressesSkipUsageCheck = BufferHelper.readNullableBoolean(buffer); } + + if (buffer.readableBytes() > 0) { + idCacheSize = BufferHelper.readNullableInteger(buffer); + } } @Override @@ -1652,6 +1671,7 @@ public class AddressSettings implements Mergeable, Serializable BufferHelper.sizeOfNullableInteger(maxReadPageBytes) + BufferHelper.sizeOfNullableLong(pageLimitBytes) + BufferHelper.sizeOfNullableLong(pageLimitMessages) + + BufferHelper.sizeOfNullableInteger(idCacheSize) + BufferHelper.sizeOfNullableSimpleString(pageFullMessagePolicy != null ? pageFullMessagePolicy.toString() : null); } @@ -1802,6 +1822,8 @@ public class AddressSettings implements Mergeable, Serializable BufferHelper.writeNullableBoolean(buffer, autoDeleteQueuesSkipUsageCheck); BufferHelper.writeNullableBoolean(buffer, autoDeleteAddressesSkipUsageCheck); + + BufferHelper.writeNullableInteger(buffer, idCacheSize); } /* (non-Javadoc) @@ -1883,6 +1905,7 @@ public class AddressSettings implements Mergeable, Serializable result = prime * result + ((pageLimitBytes == null) ? 0 : pageLimitBytes.hashCode()); result = prime * result + ((pageLimitMessages == null) ? 0 : pageLimitMessages.hashCode()); result = prime * result + ((pageFullMessagePolicy == null) ? 0 : pageFullMessagePolicy.hashCode()); + result = prime * result + ((idCacheSize == null) ? 0 : idCacheSize.hashCode()); return result; } @@ -2290,6 +2313,13 @@ public class AddressSettings implements Mergeable, Serializable return false; } + if (idCacheSize == null) { + if (other.idCacheSize != null) { + return false; + } + } else if (!idCacheSize.equals(other.idCacheSize)) { + return false; + } return true; } @@ -2438,6 +2468,8 @@ public class AddressSettings implements Mergeable, Serializable pageLimitMessages + ", pageFullMessagePolicy=" + pageFullMessagePolicy + + ", idCacheSize=" + + idCacheSize + "]"; } } diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd index 5b23f0ffff..37144f8ba4 100644 --- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd +++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd @@ -4281,6 +4281,15 @@ + + + + This will set the Duplicate ID cache size for the matching address. By default the global + setting for `id-cache-size` will be used. + + + + diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java index 5c71d0cece..5950b9d0c3 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java @@ -1120,6 +1120,7 @@ public class ConfigurationImplTest extends ActiveMQTestBase { properties.put("addressSettings.NeedToSet.autoDeleteCreatedQueues", "true"); properties.put("addressSettings.NeedToSet.defaultExclusiveQueue", "true"); properties.put("addressSettings.NeedToSet.defaultMaxConsumers", 10); + properties.put("addressSettings.NeedToSet.iDCacheSize", 10); configuration.parsePrefixedProperties(properties, null); @@ -1185,6 +1186,7 @@ public class ConfigurationImplTest extends ActiveMQTestBase { Assert.assertTrue(configuration.getAddressSettings().get("NeedToSet").isAutoDeleteCreatedQueues()); Assert.assertTrue(configuration.getAddressSettings().get("NeedToSet").isDefaultExclusiveQueue()); Assert.assertEquals(Integer.valueOf(10), configuration.getAddressSettings().get("NeedToSet").getDefaultMaxConsumers()); + Assert.assertEquals(Integer.valueOf(10), configuration.getAddressSettings().get("NeedToSet").getIDCacheSize()); } @Test diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java index ddbac90aca..ee795ea505 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java @@ -461,6 +461,7 @@ public class FileConfigurationTest extends ConfigurationImplTest { assertEquals(0, conf.getAddressSettings().get("a1").getRetroactiveMessageCount()); assertTrue(conf.getAddressSettings().get("a1").isEnableMetrics()); assertTrue(conf.getAddressSettings().get("a1").isEnableIngressTimestamp()); + assertEquals(null, conf.getAddressSettings().get("a1").getIDCacheSize()); assertEquals("a2.1", conf.getAddressSettings().get("a2").getDeadLetterAddress().toString()); assertEquals(true, conf.getAddressSettings().get("a2").isAutoCreateDeadLetterResources()); @@ -500,6 +501,7 @@ public class FileConfigurationTest extends ConfigurationImplTest { assertEquals(10, conf.getAddressSettings().get("a2").getRetroactiveMessageCount()); assertFalse(conf.getAddressSettings().get("a2").isEnableMetrics()); assertFalse(conf.getAddressSettings().get("a2").isEnableIngressTimestamp()); + assertEquals(Integer.valueOf(500), conf.getAddressSettings().get("a2").getIDCacheSize()); assertTrue(conf.getResourceLimitSettings().containsKey("myUser")); assertEquals(104, conf.getResourceLimitSettings().get("myUser").getMaxConnections()); diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/settings/AddressSettingsTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/settings/AddressSettingsTest.java index 73a936e010..d3fed0d55b 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/settings/AddressSettingsTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/settings/AddressSettingsTest.java @@ -74,6 +74,7 @@ public class AddressSettingsTest extends ActiveMQTestBase { addressSettingsToMerge.setExpiryDelay(999L); addressSettingsToMerge.setMinExpiryDelay(888L); addressSettingsToMerge.setMaxExpiryDelay(777L); + addressSettingsToMerge.setIDCacheSize(5); addressSettings.merge(addressSettingsToMerge); Assert.assertEquals(addressSettings.getDeadLetterAddress(), DLQ); @@ -90,6 +91,7 @@ public class AddressSettingsTest extends ActiveMQTestBase { Assert.assertEquals(Long.valueOf(999), addressSettings.getExpiryDelay()); Assert.assertEquals(Long.valueOf(888), addressSettings.getMinExpiryDelay()); Assert.assertEquals(Long.valueOf(777), addressSettings.getMaxExpiryDelay()); + Assert.assertEquals(Integer.valueOf(5), addressSettings.getIDCacheSize()); } @Test diff --git a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml index 72261652c0..f57c05e128 100644 --- a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml +++ b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml @@ -572,6 +572,7 @@ false 400 265 + 500 diff --git a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config-address-settings.xml b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config-address-settings.xml index ef34e4836e..091a8f4f17 100644 --- a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config-address-settings.xml +++ b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config-address-settings.xml @@ -83,5 +83,6 @@ 10000 10 false + 500 \ No newline at end of file diff --git a/artemis-server/src/test/resources/ConfigurationTest-xinclude-schema-config-address-settings.xml b/artemis-server/src/test/resources/ConfigurationTest-xinclude-schema-config-address-settings.xml index ef34e4836e..091a8f4f17 100644 --- a/artemis-server/src/test/resources/ConfigurationTest-xinclude-schema-config-address-settings.xml +++ b/artemis-server/src/test/resources/ConfigurationTest-xinclude-schema-config-address-settings.xml @@ -83,5 +83,6 @@ 10000 10 false + 500 \ No newline at end of file diff --git a/docs/user-manual/address-settings.adoc b/docs/user-manual/address-settings.adoc index da51ac8b43..bb7bba0938 100644 --- a/docs/user-manual/address-settings.adoc +++ b/docs/user-manual/address-settings.adoc @@ -76,6 +76,7 @@ Here an example of an `address-setting` entry that might be found in the `broker 0 true false + 500 ---- @@ -371,3 +372,9 @@ For core messages (used by the core and OpenWire protocols) the broker will add For STOMP messages the broker will add a frame header named `ingress-timestamp`. The value will be the number of milliseconds since the https://en.wikipedia.org/wiki/Unix_time[epoch]. Default is `false`. + +id-cache-size:: +defines the maximum size of the duplicate ID cache for an address, as each address has it's own cache +that helps to detect and prevent the processing of duplicate messages based on their unique identification. +By default, the `id-cache-size` setting inherits from the global `id-cache-size`, with a default of `20000` +elements if not explicitly configured. Read more about xref:duplicate-detection.adoc#configuring-the-duplicate-id-cache[duplicate id cache sizes]. \ No newline at end of file diff --git a/docs/user-manual/configuration-index.adoc b/docs/user-manual/configuration-index.adoc index 703f36dcf3..d3a2fcbbbd 100644 --- a/docs/user-manual/configuration-index.adoc +++ b/docs/user-manual/configuration-index.adoc @@ -808,6 +808,10 @@ see `auto-create-queues` & `auto-create-addresses` | xref:retroactive-addresses.adoc#retroactive-addresses[retroactive-message-count] | the number of messages to preserve for future queues created on the matching address | `0` + +| xref:duplicate-detection.adoc#configuring-the-duplicate-id-cache[id-cache-size] +| The duplicate detection circular cache size +| Inherits from global `id-cache-size` |=== == bridge type diff --git a/docs/user-manual/duplicate-detection.adoc b/docs/user-manual/duplicate-detection.adoc index 24e7fc333b..2f8eb79f0e 100644 --- a/docs/user-manual/duplicate-detection.adoc +++ b/docs/user-manual/duplicate-detection.adoc @@ -82,6 +82,13 @@ If the cache has a maximum size of `n` elements, then the ``n + 1``th id stored The maximum size of the cache is configured by the parameter `id-cache-size` in `broker.xml`, the default value is `20000` elements. +To implement an address-specific `id-cache-size`, you can add to the +corresponding address-settings section in `broker.xml`. Specify the +desired `id-cache-size` value for the particular address. When a message +is sent to an address with a specific `id-cache-size` configured, it +will take precedence over the global `id-cache-size` value, allowing +for greater flexibility and optimization of duplicate ID caching. + The caches can also be configured to persist to disk or not. This is configured by the parameter `persist-id-cache`, also in `broker.xml`. If this is set to `true` then each id will be persisted to permanent storage as they are received. diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/DuplicateDetectionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/DuplicateDetectionTest.java index 5f09dc6f95..82ce10d068 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/DuplicateDetectionTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/DuplicateDetectionTest.java @@ -39,6 +39,7 @@ import org.apache.activemq.artemis.api.core.client.ServerLocator; import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl; import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.transaction.impl.XidImpl; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.utils.UUIDGenerator; @@ -576,6 +577,87 @@ public class DuplicateDetectionTest extends ActiveMQTestBase { } } + @Test + public void testDuplicateIDCacheSizeForAddressSpecificSetting() throws Exception { + server.stop(); + + final int addressIdCacheSize = 1; + final int globalIdCacheSize = 2; + final SimpleString dupIDOne = new SimpleString("1"); + final SimpleString dupIDTwo = new SimpleString("2"); + final SimpleString globalSettingsQueueName = new SimpleString("GlobalIdCacheSizeQueue"); + final SimpleString addressSettingsQueueName = new SimpleString("AddressIdCacheSizeQueue"); + AddressSettings testAddressSettings = new AddressSettings(); + testAddressSettings.setIDCacheSize(addressIdCacheSize); + + config = createDefaultInVMConfig().setIDCacheSize(globalIdCacheSize); + config.getAddressSettings().put(addressSettingsQueueName.toString(), testAddressSettings); + + server = createServer(config); + server.start(); + sf = createSessionFactory(locator); + ClientSession session = sf.createSession(false, true, true); + session.start(); + + session.createQueue(new QueueConfiguration(globalSettingsQueueName).setDurable(false)); + session.createQueue(new QueueConfiguration(addressSettingsQueueName).setDurable(false)); + + ClientProducer addressSettingsProducer = session.createProducer(addressSettingsQueueName); + ClientConsumer addressSettingsConsumer = session.createConsumer(addressSettingsQueueName); + ClientProducer globalSettingsProducer = session.createProducer(globalSettingsQueueName); + ClientConsumer globalSettingsConsumer = session.createConsumer(globalSettingsQueueName); + + + ClientMessage globalSettingsMessage1 = createMessage(session, 1); + globalSettingsMessage1.putBytesProperty(Message.HDR_DUPLICATE_DETECTION_ID, dupIDOne.getData()); + globalSettingsProducer.send(globalSettingsMessage1); + + ClientMessage globalSettingsMessage2 = createMessage(session, 2); + globalSettingsMessage2.putBytesProperty(Message.HDR_DUPLICATE_DETECTION_ID, dupIDTwo.getData()); + globalSettingsProducer.send(globalSettingsMessage2); + + // globalSettingsMessage3 will be ignored by the server - dupIDOne was only 2 messages ago + ClientMessage globalSettingsMessage3 = createMessage(session, 3); + globalSettingsMessage3.putBytesProperty(Message.HDR_DUPLICATE_DETECTION_ID, dupIDOne.getData()); + globalSettingsProducer.send(globalSettingsMessage3); + + globalSettingsMessage1 = globalSettingsConsumer.receive(1000); + Assert.assertEquals(1, globalSettingsMessage1.getObjectProperty(propKey)); + + globalSettingsMessage2 = globalSettingsConsumer.receive(1000); + Assert.assertEquals(2, globalSettingsMessage2.getObjectProperty(propKey)); + + // globalSettingsMessage3 will be ignored by the server because dupIDOne is duplicate + globalSettingsMessage3 = globalSettingsConsumer.receiveImmediate(); + Assert.assertNull(globalSettingsMessage3); + + ClientMessage addressSettingsMessage1 = createMessage(session, 1); + addressSettingsMessage1.putBytesProperty(Message.HDR_DUPLICATE_DETECTION_ID, dupIDOne.getData()); + addressSettingsProducer.send(addressSettingsMessage1); + + ClientMessage addressSettingsMessage2 = createMessage(session, 2); + addressSettingsMessage2.putBytesProperty(Message.HDR_DUPLICATE_DETECTION_ID, dupIDTwo.getData()); + addressSettingsProducer.send(addressSettingsMessage2); + + // addressSettingsMessage3 will not be ignored because the id-cache-size is only 1 + // and dupOne was 2 messages ago + ClientMessage addressSettingsMessage3 = createMessage(session, 3); + addressSettingsMessage3.putBytesProperty(Message.HDR_DUPLICATE_DETECTION_ID, dupIDOne.getData()); + addressSettingsProducer.send(addressSettingsMessage3); + + addressSettingsMessage1 = addressSettingsConsumer.receive(1000); + Assert.assertEquals(1, addressSettingsMessage1.getObjectProperty(propKey)); + + addressSettingsMessage2 = addressSettingsConsumer.receive(1000); + Assert.assertEquals(2, addressSettingsMessage2.getObjectProperty(propKey)); + + // addressSettingsMessage3 will be acked successfully by addressSettingsConsumer + // because the id-cache-size is only 1 (instead of the global size of 2) + addressSettingsMessage3 = addressSettingsConsumer.receive(1000); + Assert.assertEquals(3, addressSettingsMessage3.getObjectProperty(propKey)); + + session.commit(); + } @Test public void testTransactedDuplicateDetection1() throws Exception { ClientSession session = sf.createSession(false, false, false);