ARTEMIS-4159 Support duplicate cache size configuration per address
This commit introduces support for configuring a specific Duplicate ID cache size per address in the Artemis server. Previously, there was only a global setting for the ID cache size, but now each address can have its own cache size. The changes include the addition of a new configuration property id-cache-size in the Artemis server configuration file. This property can now be specified under each address setting in the configuration file, and its value will determine the Duplicate ID cache size for that particular address. If the id-cache-size property is not specified for an address, it will use the global setting. The test cases have been updated to cover this new functionality, and integration test have been added to verify that address-specific cache sizes work as expected. Documentation has been added to address-settings.adoc, configuration-index.adoc and duplicate-detection.adoc
This commit is contained in:
parent
116274e9ca
commit
c5f0e3400c
artemis-server/src
main
java/org/apache/activemq/artemis/core
deployers/impl
postoffice/impl
settings/impl
resources/schema
test
java/org/apache/activemq/artemis/core
resources
docs/user-manual
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration
|
@ -342,6 +342,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
|
|||
|
||||
private static final String ENABLE_INGRESS_TIMESTAMP = "enable-ingress-timestamp";
|
||||
|
||||
private static final String ID_CACHE_SIZE = "id-cache-size";
|
||||
|
||||
private boolean validateAIO = false;
|
||||
|
||||
private boolean printPageMaxSizeUsed = false;
|
||||
|
@ -1456,6 +1458,10 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
|
|||
addressSettings.setEnableMetrics(XMLUtil.parseBoolean(child));
|
||||
} else if (ENABLE_INGRESS_TIMESTAMP.equalsIgnoreCase(name)) {
|
||||
addressSettings.setEnableIngressTimestamp(XMLUtil.parseBoolean(child));
|
||||
} else if (ID_CACHE_SIZE.equalsIgnoreCase(name)) {
|
||||
int idCacheSize = XMLUtil.parseInt(child);
|
||||
Validators.GE_ZERO.validate(ID_CACHE_SIZE, idCacheSize);
|
||||
addressSettings.setIDCacheSize(XMLUtil.parseInt(child));
|
||||
}
|
||||
}
|
||||
return setting;
|
||||
|
|
|
@ -1407,9 +1407,15 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
|
|||
return null;
|
||||
}
|
||||
|
||||
private int resolveIdCacheSize(SimpleString address) {
|
||||
final AddressSettings addressSettings = addressSettingsRepository.getMatch(address.toString());
|
||||
return addressSettings.getIDCacheSize() == null ? idCacheSize : addressSettings.getIDCacheSize();
|
||||
}
|
||||
|
||||
@Override
|
||||
public DuplicateIDCache getDuplicateIDCache(final SimpleString address) {
|
||||
return getDuplicateIDCache(address, idCacheSize);
|
||||
int resolvedIdCacheSize = resolveIdCacheSize(address);
|
||||
return getDuplicateIDCache(address, resolvedIdCacheSize);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -293,6 +293,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
|||
|
||||
private Boolean enableIngressTimestamp = null;
|
||||
|
||||
private Integer idCacheSize = null;
|
||||
|
||||
//from amq5
|
||||
//make it transient
|
||||
private transient Integer queuePrefetch = null;
|
||||
|
@ -370,6 +372,7 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
|||
this.managementMessageAttributeSizeLimit = other.managementMessageAttributeSizeLimit;
|
||||
this.slowConsumerThresholdMeasurementUnit = other.slowConsumerThresholdMeasurementUnit;
|
||||
this.enableIngressTimestamp = other.enableIngressTimestamp;
|
||||
this.idCacheSize = other.idCacheSize;
|
||||
}
|
||||
|
||||
public AddressSettings() {
|
||||
|
@ -1073,6 +1076,15 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
|||
return this;
|
||||
}
|
||||
|
||||
public Integer getIDCacheSize() {
|
||||
return idCacheSize;
|
||||
}
|
||||
|
||||
public AddressSettings setIDCacheSize(int idCacheSize) {
|
||||
this.idCacheSize = idCacheSize;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* merge 2 objects in to 1
|
||||
*
|
||||
|
@ -1302,6 +1314,9 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
|||
if (pageLimitMessages == null) {
|
||||
pageLimitMessages = merged.pageLimitMessages;
|
||||
}
|
||||
if (idCacheSize == null) {
|
||||
idCacheSize = merged.idCacheSize;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -1577,6 +1592,10 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
|||
if (buffer.readableBytes() > 0) {
|
||||
autoDeleteAddressesSkipUsageCheck = BufferHelper.readNullableBoolean(buffer);
|
||||
}
|
||||
|
||||
if (buffer.readableBytes() > 0) {
|
||||
idCacheSize = BufferHelper.readNullableInteger(buffer);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -1652,6 +1671,7 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
|||
BufferHelper.sizeOfNullableInteger(maxReadPageBytes) +
|
||||
BufferHelper.sizeOfNullableLong(pageLimitBytes) +
|
||||
BufferHelper.sizeOfNullableLong(pageLimitMessages) +
|
||||
BufferHelper.sizeOfNullableInteger(idCacheSize) +
|
||||
BufferHelper.sizeOfNullableSimpleString(pageFullMessagePolicy != null ? pageFullMessagePolicy.toString() : null);
|
||||
}
|
||||
|
||||
|
@ -1802,6 +1822,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
|||
BufferHelper.writeNullableBoolean(buffer, autoDeleteQueuesSkipUsageCheck);
|
||||
|
||||
BufferHelper.writeNullableBoolean(buffer, autoDeleteAddressesSkipUsageCheck);
|
||||
|
||||
BufferHelper.writeNullableInteger(buffer, idCacheSize);
|
||||
}
|
||||
|
||||
/* (non-Javadoc)
|
||||
|
@ -1883,6 +1905,7 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
|||
result = prime * result + ((pageLimitBytes == null) ? 0 : pageLimitBytes.hashCode());
|
||||
result = prime * result + ((pageLimitMessages == null) ? 0 : pageLimitMessages.hashCode());
|
||||
result = prime * result + ((pageFullMessagePolicy == null) ? 0 : pageFullMessagePolicy.hashCode());
|
||||
result = prime * result + ((idCacheSize == null) ? 0 : idCacheSize.hashCode());
|
||||
|
||||
return result;
|
||||
}
|
||||
|
@ -2290,6 +2313,13 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
|||
return false;
|
||||
}
|
||||
|
||||
if (idCacheSize == null) {
|
||||
if (other.idCacheSize != null) {
|
||||
return false;
|
||||
}
|
||||
} else if (!idCacheSize.equals(other.idCacheSize)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
@ -2438,6 +2468,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
|||
pageLimitMessages +
|
||||
", pageFullMessagePolicy=" +
|
||||
pageFullMessagePolicy +
|
||||
", idCacheSize=" +
|
||||
idCacheSize +
|
||||
"]";
|
||||
}
|
||||
}
|
||||
|
|
|
@ -4281,6 +4281,15 @@
|
|||
</xsd:annotation>
|
||||
</xsd:element>
|
||||
|
||||
<xsd:element name="id-cache-size" type="xsd:int" maxOccurs="1" minOccurs="0">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
This will set the Duplicate ID cache size for the matching address. By default the global
|
||||
setting for `id-cache-size` will be used.
|
||||
</xsd:documentation>
|
||||
</xsd:annotation>
|
||||
</xsd:element>
|
||||
|
||||
</xsd:all>
|
||||
|
||||
<xsd:attribute name="match" type="xsd:string" use="required">
|
||||
|
|
|
@ -1120,6 +1120,7 @@ public class ConfigurationImplTest extends ActiveMQTestBase {
|
|||
properties.put("addressSettings.NeedToSet.autoDeleteCreatedQueues", "true");
|
||||
properties.put("addressSettings.NeedToSet.defaultExclusiveQueue", "true");
|
||||
properties.put("addressSettings.NeedToSet.defaultMaxConsumers", 10);
|
||||
properties.put("addressSettings.NeedToSet.iDCacheSize", 10);
|
||||
|
||||
configuration.parsePrefixedProperties(properties, null);
|
||||
|
||||
|
@ -1185,6 +1186,7 @@ public class ConfigurationImplTest extends ActiveMQTestBase {
|
|||
Assert.assertTrue(configuration.getAddressSettings().get("NeedToSet").isAutoDeleteCreatedQueues());
|
||||
Assert.assertTrue(configuration.getAddressSettings().get("NeedToSet").isDefaultExclusiveQueue());
|
||||
Assert.assertEquals(Integer.valueOf(10), configuration.getAddressSettings().get("NeedToSet").getDefaultMaxConsumers());
|
||||
Assert.assertEquals(Integer.valueOf(10), configuration.getAddressSettings().get("NeedToSet").getIDCacheSize());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -461,6 +461,7 @@ public class FileConfigurationTest extends ConfigurationImplTest {
|
|||
assertEquals(0, conf.getAddressSettings().get("a1").getRetroactiveMessageCount());
|
||||
assertTrue(conf.getAddressSettings().get("a1").isEnableMetrics());
|
||||
assertTrue(conf.getAddressSettings().get("a1").isEnableIngressTimestamp());
|
||||
assertEquals(null, conf.getAddressSettings().get("a1").getIDCacheSize());
|
||||
|
||||
assertEquals("a2.1", conf.getAddressSettings().get("a2").getDeadLetterAddress().toString());
|
||||
assertEquals(true, conf.getAddressSettings().get("a2").isAutoCreateDeadLetterResources());
|
||||
|
@ -500,6 +501,7 @@ public class FileConfigurationTest extends ConfigurationImplTest {
|
|||
assertEquals(10, conf.getAddressSettings().get("a2").getRetroactiveMessageCount());
|
||||
assertFalse(conf.getAddressSettings().get("a2").isEnableMetrics());
|
||||
assertFalse(conf.getAddressSettings().get("a2").isEnableIngressTimestamp());
|
||||
assertEquals(Integer.valueOf(500), conf.getAddressSettings().get("a2").getIDCacheSize());
|
||||
|
||||
assertTrue(conf.getResourceLimitSettings().containsKey("myUser"));
|
||||
assertEquals(104, conf.getResourceLimitSettings().get("myUser").getMaxConnections());
|
||||
|
|
|
@ -74,6 +74,7 @@ public class AddressSettingsTest extends ActiveMQTestBase {
|
|||
addressSettingsToMerge.setExpiryDelay(999L);
|
||||
addressSettingsToMerge.setMinExpiryDelay(888L);
|
||||
addressSettingsToMerge.setMaxExpiryDelay(777L);
|
||||
addressSettingsToMerge.setIDCacheSize(5);
|
||||
|
||||
addressSettings.merge(addressSettingsToMerge);
|
||||
Assert.assertEquals(addressSettings.getDeadLetterAddress(), DLQ);
|
||||
|
@ -90,6 +91,7 @@ public class AddressSettingsTest extends ActiveMQTestBase {
|
|||
Assert.assertEquals(Long.valueOf(999), addressSettings.getExpiryDelay());
|
||||
Assert.assertEquals(Long.valueOf(888), addressSettings.getMinExpiryDelay());
|
||||
Assert.assertEquals(Long.valueOf(777), addressSettings.getMaxExpiryDelay());
|
||||
Assert.assertEquals(Integer.valueOf(5), addressSettings.getIDCacheSize());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -572,6 +572,7 @@
|
|||
<enable-metrics>false</enable-metrics>
|
||||
<management-browse-page-size>400</management-browse-page-size>
|
||||
<management-message-attribute-size-limit>265</management-message-attribute-size-limit>
|
||||
<id-cache-size>500</id-cache-size>
|
||||
</address-setting>
|
||||
</address-settings>
|
||||
<resource-limit-settings>
|
||||
|
|
|
@ -83,5 +83,6 @@
|
|||
<default-consumer-window-size>10000</default-consumer-window-size>
|
||||
<retroactive-message-count>10</retroactive-message-count>
|
||||
<enable-metrics>false</enable-metrics>
|
||||
<id-cache-size>500</id-cache-size>
|
||||
</address-setting>
|
||||
</address-settings>
|
|
@ -83,5 +83,6 @@
|
|||
<default-consumer-window-size>10000</default-consumer-window-size>
|
||||
<retroactive-message-count>10</retroactive-message-count>
|
||||
<enable-metrics>false</enable-metrics>
|
||||
<id-cache-size>500</id-cache-size>
|
||||
</address-setting>
|
||||
</address-settings>
|
|
@ -76,6 +76,7 @@ Here an example of an `address-setting` entry that might be found in the `broker
|
|||
<retroactive-message-count>0</retroactive-message-count>
|
||||
<enable-metrics>true</enable-metrics>
|
||||
<enable-ingress-timestamp>false</enable-ingress-timestamp>
|
||||
<id-cache-size>500</id-cache-size>
|
||||
</address-setting>
|
||||
</address-settings>
|
||||
----
|
||||
|
@ -371,3 +372,9 @@ For core messages (used by the core and OpenWire protocols) the broker will add
|
|||
For STOMP messages the broker will add a frame header named `ingress-timestamp`.
|
||||
The value will be the number of milliseconds since the https://en.wikipedia.org/wiki/Unix_time[epoch].
|
||||
Default is `false`.
|
||||
|
||||
id-cache-size::
|
||||
defines the maximum size of the duplicate ID cache for an address, as each address has it's own cache
|
||||
that helps to detect and prevent the processing of duplicate messages based on their unique identification.
|
||||
By default, the `id-cache-size` setting inherits from the global `id-cache-size`, with a default of `20000`
|
||||
elements if not explicitly configured. Read more about xref:duplicate-detection.adoc#configuring-the-duplicate-id-cache[duplicate id cache sizes].
|
|
@ -808,6 +808,10 @@ see `auto-create-queues` & `auto-create-addresses`
|
|||
| xref:retroactive-addresses.adoc#retroactive-addresses[retroactive-message-count]
|
||||
| the number of messages to preserve for future queues created on the matching address
|
||||
| `0`
|
||||
|
||||
| xref:duplicate-detection.adoc#configuring-the-duplicate-id-cache[id-cache-size]
|
||||
| The duplicate detection circular cache size
|
||||
| Inherits from global `id-cache-size`
|
||||
|===
|
||||
|
||||
== bridge type
|
||||
|
|
|
@ -82,6 +82,13 @@ If the cache has a maximum size of `n` elements, then the ``n + 1``th id stored
|
|||
|
||||
The maximum size of the cache is configured by the parameter `id-cache-size` in `broker.xml`, the default value is `20000` elements.
|
||||
|
||||
To implement an address-specific `id-cache-size`, you can add to the
|
||||
corresponding address-settings section in `broker.xml`. Specify the
|
||||
desired `id-cache-size` value for the particular address. When a message
|
||||
is sent to an address with a specific `id-cache-size` configured, it
|
||||
will take precedence over the global `id-cache-size` value, allowing
|
||||
for greater flexibility and optimization of duplicate ID caching.
|
||||
|
||||
The caches can also be configured to persist to disk or not.
|
||||
This is configured by the parameter `persist-id-cache`, also in `broker.xml`.
|
||||
If this is set to `true` then each id will be persisted to permanent storage as they are received.
|
||||
|
|
|
@ -39,6 +39,7 @@ import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
|||
import org.apache.activemq.artemis.core.config.Configuration;
|
||||
import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||
import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
|
||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||
import org.apache.activemq.artemis.utils.UUIDGenerator;
|
||||
|
@ -576,6 +577,87 @@ public class DuplicateDetectionTest extends ActiveMQTestBase {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDuplicateIDCacheSizeForAddressSpecificSetting() throws Exception {
|
||||
server.stop();
|
||||
|
||||
final int addressIdCacheSize = 1;
|
||||
final int globalIdCacheSize = 2;
|
||||
final SimpleString dupIDOne = new SimpleString("1");
|
||||
final SimpleString dupIDTwo = new SimpleString("2");
|
||||
final SimpleString globalSettingsQueueName = new SimpleString("GlobalIdCacheSizeQueue");
|
||||
final SimpleString addressSettingsQueueName = new SimpleString("AddressIdCacheSizeQueue");
|
||||
AddressSettings testAddressSettings = new AddressSettings();
|
||||
testAddressSettings.setIDCacheSize(addressIdCacheSize);
|
||||
|
||||
config = createDefaultInVMConfig().setIDCacheSize(globalIdCacheSize);
|
||||
config.getAddressSettings().put(addressSettingsQueueName.toString(), testAddressSettings);
|
||||
|
||||
server = createServer(config);
|
||||
server.start();
|
||||
sf = createSessionFactory(locator);
|
||||
ClientSession session = sf.createSession(false, true, true);
|
||||
session.start();
|
||||
|
||||
session.createQueue(new QueueConfiguration(globalSettingsQueueName).setDurable(false));
|
||||
session.createQueue(new QueueConfiguration(addressSettingsQueueName).setDurable(false));
|
||||
|
||||
ClientProducer addressSettingsProducer = session.createProducer(addressSettingsQueueName);
|
||||
ClientConsumer addressSettingsConsumer = session.createConsumer(addressSettingsQueueName);
|
||||
ClientProducer globalSettingsProducer = session.createProducer(globalSettingsQueueName);
|
||||
ClientConsumer globalSettingsConsumer = session.createConsumer(globalSettingsQueueName);
|
||||
|
||||
|
||||
ClientMessage globalSettingsMessage1 = createMessage(session, 1);
|
||||
globalSettingsMessage1.putBytesProperty(Message.HDR_DUPLICATE_DETECTION_ID, dupIDOne.getData());
|
||||
globalSettingsProducer.send(globalSettingsMessage1);
|
||||
|
||||
ClientMessage globalSettingsMessage2 = createMessage(session, 2);
|
||||
globalSettingsMessage2.putBytesProperty(Message.HDR_DUPLICATE_DETECTION_ID, dupIDTwo.getData());
|
||||
globalSettingsProducer.send(globalSettingsMessage2);
|
||||
|
||||
// globalSettingsMessage3 will be ignored by the server - dupIDOne was only 2 messages ago
|
||||
ClientMessage globalSettingsMessage3 = createMessage(session, 3);
|
||||
globalSettingsMessage3.putBytesProperty(Message.HDR_DUPLICATE_DETECTION_ID, dupIDOne.getData());
|
||||
globalSettingsProducer.send(globalSettingsMessage3);
|
||||
|
||||
globalSettingsMessage1 = globalSettingsConsumer.receive(1000);
|
||||
Assert.assertEquals(1, globalSettingsMessage1.getObjectProperty(propKey));
|
||||
|
||||
globalSettingsMessage2 = globalSettingsConsumer.receive(1000);
|
||||
Assert.assertEquals(2, globalSettingsMessage2.getObjectProperty(propKey));
|
||||
|
||||
// globalSettingsMessage3 will be ignored by the server because dupIDOne is duplicate
|
||||
globalSettingsMessage3 = globalSettingsConsumer.receiveImmediate();
|
||||
Assert.assertNull(globalSettingsMessage3);
|
||||
|
||||
ClientMessage addressSettingsMessage1 = createMessage(session, 1);
|
||||
addressSettingsMessage1.putBytesProperty(Message.HDR_DUPLICATE_DETECTION_ID, dupIDOne.getData());
|
||||
addressSettingsProducer.send(addressSettingsMessage1);
|
||||
|
||||
ClientMessage addressSettingsMessage2 = createMessage(session, 2);
|
||||
addressSettingsMessage2.putBytesProperty(Message.HDR_DUPLICATE_DETECTION_ID, dupIDTwo.getData());
|
||||
addressSettingsProducer.send(addressSettingsMessage2);
|
||||
|
||||
// addressSettingsMessage3 will not be ignored because the id-cache-size is only 1
|
||||
// and dupOne was 2 messages ago
|
||||
ClientMessage addressSettingsMessage3 = createMessage(session, 3);
|
||||
addressSettingsMessage3.putBytesProperty(Message.HDR_DUPLICATE_DETECTION_ID, dupIDOne.getData());
|
||||
addressSettingsProducer.send(addressSettingsMessage3);
|
||||
|
||||
addressSettingsMessage1 = addressSettingsConsumer.receive(1000);
|
||||
Assert.assertEquals(1, addressSettingsMessage1.getObjectProperty(propKey));
|
||||
|
||||
addressSettingsMessage2 = addressSettingsConsumer.receive(1000);
|
||||
Assert.assertEquals(2, addressSettingsMessage2.getObjectProperty(propKey));
|
||||
|
||||
// addressSettingsMessage3 will be acked successfully by addressSettingsConsumer
|
||||
// because the id-cache-size is only 1 (instead of the global size of 2)
|
||||
addressSettingsMessage3 = addressSettingsConsumer.receive(1000);
|
||||
Assert.assertEquals(3, addressSettingsMessage3.getObjectProperty(propKey));
|
||||
|
||||
session.commit();
|
||||
}
|
||||
@Test
|
||||
public void testTransactedDuplicateDetection1() throws Exception {
|
||||
ClientSession session = sf.createSession(false, false, false);
|
||||
|
|
Loading…
Reference in New Issue