From 373803a85d0b6da47e0dee8a4f2deb1a68caa8c8 Mon Sep 17 00:00:00 2001 From: Michael Andre Pearce Date: Wed, 24 May 2017 08:33:19 +0100 Subject: [PATCH] ARTEMIS-1179: Add Optional Client JMS Destination Cache Add topic and queue cache maps in Session. Add configuration to use cache or not with defaulting to false, which keeps existing behaviour as the default. --- .../jms/client/ActiveMQConnection.java | 9 ++++- .../jms/client/ActiveMQConnectionFactory.java | 23 ++++++++--- .../artemis/jms/client/ActiveMQSession.java | 38 +++++++++++++++++-- .../jms/client/ActiveMQXAConnection.java | 3 +- .../artemis/jms/client/ActiveMQXASession.java | 3 +- .../artemis/uri/ConnectionFactoryURITest.java | 12 ++++++ .../ActiveMQRAManagedConnectionFactory.java | 8 ++++ .../artemis/ra/ActiveMQResourceAdapter.java | 31 +++++++++++++++ .../ra/ConnectionFactoryProperties.java | 17 +++++++++ docs/user-manual/en/perf-tuning.md | 5 +++ docs/user-manual/en/using-jms.md | 11 ++++++ .../artemis-ra-rar/src/main/resources/ra.xml | 6 +++ .../ra/ActiveMQResourceAdapterConfigTest.java | 6 +++ 13 files changed, 159 insertions(+), 13 deletions(-) diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQConnection.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQConnection.java index c90e630ce1..90ab952599 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQConnection.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQConnection.java @@ -126,6 +126,8 @@ public class ActiveMQConnection extends ActiveMQConnectionForContextImpl impleme private final int transactionBatchSize; + private final boolean cacheDestinations; + private ClientSession initialSession; private final Exception creationStack; @@ -143,6 +145,7 @@ public class ActiveMQConnection extends ActiveMQConnectionForContextImpl impleme final String clientID, final int dupsOKBatchSize, final int transactionBatchSize, + final boolean cacheDestinations, final ClientSessionFactory sessionFactory) { this.options = options; @@ -164,6 +167,8 @@ public class ActiveMQConnection extends ActiveMQConnectionForContextImpl impleme this.transactionBatchSize = transactionBatchSize; + this.cacheDestinations = cacheDestinations; + creationStack = new Exception(); } @@ -654,9 +659,9 @@ public class ActiveMQConnection extends ActiveMQConnectionForContextImpl impleme ClientSession session, int type) { if (isXA) { - return new ActiveMQXASession(options, this, transacted, true, acknowledgeMode, session, type); + return new ActiveMQXASession(options, this, transacted, true, acknowledgeMode, cacheDestinations, session, type); } else { - return new ActiveMQSession(options, this, transacted, false, acknowledgeMode, session, type); + return new ActiveMQSession(options, this, transacted, false, acknowledgeMode, cacheDestinations, session, type); } } diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQConnectionFactory.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQConnectionFactory.java index dee81573d1..06acd25bb0 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQConnectionFactory.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQConnectionFactory.java @@ -84,6 +84,8 @@ public class ActiveMQConnectionFactory implements ConnectionFactoryOptions, Exte private String deserializationWhiteList; + private boolean cacheDestinations; + private boolean finalizeChecks; @Override @@ -428,6 +430,15 @@ public class ActiveMQConnectionFactory implements ConnectionFactoryOptions, Exte this.transactionBatchSize = transactionBatchSize; } + public synchronized boolean isCacheDestinations() { + return this.cacheDestinations; + } + + public synchronized void setCacheDestinations(final boolean cacheDestinations) { + checkWrite(); + this.cacheDestinations = cacheDestinations; + } + public synchronized long getClientFailureCheckPeriod() { return serverLocator.getClientFailureCheckPeriod(); } @@ -766,19 +777,19 @@ public class ActiveMQConnectionFactory implements ConnectionFactoryOptions, Exte if (isXA) { if (type == ActiveMQConnection.TYPE_GENERIC_CONNECTION) { - connection = new ActiveMQXAConnection(this, username, password, type, clientID, dupsOKBatchSize, transactionBatchSize, factory); + connection = new ActiveMQXAConnection(this, username, password, type, clientID, dupsOKBatchSize, transactionBatchSize, cacheDestinations, factory); } else if (type == ActiveMQConnection.TYPE_QUEUE_CONNECTION) { - connection = new ActiveMQXAConnection(this, username, password, type, clientID, dupsOKBatchSize, transactionBatchSize, factory); + connection = new ActiveMQXAConnection(this, username, password, type, clientID, dupsOKBatchSize, transactionBatchSize, cacheDestinations, factory); } else if (type == ActiveMQConnection.TYPE_TOPIC_CONNECTION) { - connection = new ActiveMQXAConnection(this, username, password, type, clientID, dupsOKBatchSize, transactionBatchSize, factory); + connection = new ActiveMQXAConnection(this, username, password, type, clientID, dupsOKBatchSize, transactionBatchSize, cacheDestinations, factory); } } else { if (type == ActiveMQConnection.TYPE_GENERIC_CONNECTION) { - connection = new ActiveMQConnection(this, username, password, type, clientID, dupsOKBatchSize, transactionBatchSize, factory); + connection = new ActiveMQConnection(this, username, password, type, clientID, dupsOKBatchSize, transactionBatchSize, cacheDestinations, factory); } else if (type == ActiveMQConnection.TYPE_QUEUE_CONNECTION) { - connection = new ActiveMQConnection(this, username, password, type, clientID, dupsOKBatchSize, transactionBatchSize, factory); + connection = new ActiveMQConnection(this, username, password, type, clientID, dupsOKBatchSize, transactionBatchSize, cacheDestinations, factory); } else if (type == ActiveMQConnection.TYPE_TOPIC_CONNECTION) { - connection = new ActiveMQConnection(this, username, password, type, clientID, dupsOKBatchSize, transactionBatchSize, factory); + connection = new ActiveMQConnection(this, username, password, type, clientID, dupsOKBatchSize, transactionBatchSize, cacheDestinations, factory); } } diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java index 2ea145a77d..a8aceec8ef 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java @@ -45,8 +45,10 @@ import javax.jms.TransactionInProgressException; import javax.transaction.xa.XAResource; import java.io.Serializable; import java.util.HashSet; +import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException; @@ -95,6 +97,12 @@ public class ActiveMQSession implements QueueSession, TopicSession { private final Set consumers = new HashSet<>(); + private final boolean cacheDestination; + + private final Map topicCache = new ConcurrentHashMap<>(); + + private final Map queueCache = new ConcurrentHashMap<>(); + // Constructors -------------------------------------------------- protected ActiveMQSession(final ConnectionFactoryOptions options, @@ -102,6 +110,7 @@ public class ActiveMQSession implements QueueSession, TopicSession { final boolean transacted, final boolean xa, final int ackMode, + final boolean cacheDestination, final ClientSession session, final int sessionType) { this.options = options; @@ -117,6 +126,8 @@ public class ActiveMQSession implements QueueSession, TopicSession { this.transacted = transacted; this.xa = xa; + + this.cacheDestination = cacheDestination; } // Session implementation ---------------------------------------- @@ -255,6 +266,8 @@ public class ActiveMQSession implements QueueSession, TopicSession { throw JMSExceptionHelper.convertFromActiveMQException(e); } } + topicCache.clear(); + queueCache.clear(); } @Override @@ -367,7 +380,17 @@ public class ActiveMQSession implements QueueSession, TopicSession { } try { - return internalCreateQueue(queueName, false); + Queue queue = null; + if (cacheDestination) { + queue = queueCache.get(queueName); + } + if (queue == null) { + queue = internalCreateQueue(queueName, false); + } + if (cacheDestination) { + queueCache.put(queueName, queue); + } + return queue; } catch (ActiveMQException e) { throw JMSExceptionHelper.convertFromActiveMQException(e); } @@ -396,9 +419,18 @@ public class ActiveMQSession implements QueueSession, TopicSession { if (sessionType == ActiveMQSession.TYPE_QUEUE_SESSION) { throw new IllegalStateException("Cannot create a topic on a QueueSession"); } - try { - return internalCreateTopic(topicName, false); + Topic topic = null; + if (cacheDestination) { + topic = topicCache.get(topicName); + } + if (topic == null) { + topic = internalCreateTopic(topicName, false); + } + if (cacheDestination) { + topicCache.put(topicName, topic); + } + return topic; } catch (ActiveMQException e) { throw JMSExceptionHelper.convertFromActiveMQException(e); } diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQXAConnection.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQXAConnection.java index 4407fbbb73..fcc9bb2516 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQXAConnection.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQXAConnection.java @@ -41,8 +41,9 @@ public final class ActiveMQXAConnection extends ActiveMQConnection implements XA final String clientID, final int dupsOKBatchSize, final int transactionBatchSize, + final boolean cacheDestinations, final ClientSessionFactory sessionFactory) { - super(options, username, password, connectionType, clientID, dupsOKBatchSize, transactionBatchSize, sessionFactory); + super(options, username, password, connectionType, clientID, dupsOKBatchSize, transactionBatchSize, cacheDestinations, sessionFactory); } @Override diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQXASession.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQXASession.java index 4a7694fc8e..6ec936e84b 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQXASession.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQXASession.java @@ -36,8 +36,9 @@ public class ActiveMQXASession extends ActiveMQSession implements XAQueueSession boolean transacted, boolean xa, int ackMode, + boolean cacheDestinations, ClientSession session, int sessionType) { - super(options, connection, transacted, xa, ackMode, session, sessionType); + super(options, connection, transacted, xa, ackMode, cacheDestinations, session, sessionType); } } diff --git a/artemis-jms-client/src/test/java/org/apache/activemq/artemis/uri/ConnectionFactoryURITest.java b/artemis-jms-client/src/test/java/org/apache/activemq/artemis/uri/ConnectionFactoryURITest.java index dc2d459790..9c1b9b8d6c 100644 --- a/artemis-jms-client/src/test/java/org/apache/activemq/artemis/uri/ConnectionFactoryURITest.java +++ b/artemis-jms-client/src/test/java/org/apache/activemq/artemis/uri/ConnectionFactoryURITest.java @@ -400,6 +400,18 @@ public class ConnectionFactoryURITest { checkEquals(bean, connectionFactoryWithHA, factory); } + @Test + public void testCacheDestinations() throws Exception { + ActiveMQConnectionFactory factory = parser.newObject(new URI("tcp://localhost:3030"), null); + + Assert.assertFalse(factory.isCacheDestinations()); + + factory = parser.newObject(new URI("tcp://localhost:3030?cacheDestinations=true"), null); + + Assert.assertTrue(factory.isCacheDestinations()); + + } + private void populate(StringBuilder sb, BeanUtilsBean bean, ActiveMQConnectionFactory factory) throws IllegalAccessException, InvocationTargetException { diff --git a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQRAManagedConnectionFactory.java b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQRAManagedConnectionFactory.java index a3541a6ec4..da99a1326a 100644 --- a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQRAManagedConnectionFactory.java +++ b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQRAManagedConnectionFactory.java @@ -580,6 +580,14 @@ public final class ActiveMQRAManagedConnectionFactory implements ManagedConnecti mcfProperties.setUseGlobalPools(useGlobalPools); } + public Boolean isCacheDestinations() { + return mcfProperties.isCacheDestinations(); + } + + public void setCacheDestinations(final Boolean cacheDestinations) { + mcfProperties.setCacheDestinations(cacheDestinations); + } + public Integer getScheduledThreadPoolMaxSize() { return mcfProperties.getScheduledThreadPoolMaxSize(); } diff --git a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQResourceAdapter.java b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQResourceAdapter.java index e9b13239f2..36eabeb85c 100644 --- a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQResourceAdapter.java +++ b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQResourceAdapter.java @@ -634,6 +634,32 @@ public class ActiveMQResourceAdapter implements ResourceAdapter, Serializable { return raProperties.isFailoverOnInitialConnection(); } + /** + * Set cacheDestinations + * + * @param cacheDestinations The value + */ + public void setCacheDestinations(final Boolean cacheDestinations) { + if (ActiveMQResourceAdapter.trace) { + ActiveMQRALogger.LOGGER.trace("setCacheDestinations(" + cacheDestinations + ")"); + } + + raProperties.setCacheDestinations(cacheDestinations); + } + + /** + * Get isCacheDestinations + * + * @return The value + */ + public Boolean isCacheDestinations() { + if (ActiveMQResourceAdapter.trace) { + ActiveMQRALogger.LOGGER.trace("isCacheDestinations()"); + } + + return raProperties.isCacheDestinations(); + } + /** * Set compressLargeMessage * @@ -1911,6 +1937,11 @@ public class ActiveMQResourceAdapter implements ResourceAdapter, Serializable { cf.setFailoverOnInitialConnection(val); } + val = overrideProperties.isCacheDestinations() != null ? overrideProperties.isCacheDestinations() : raProperties.isCacheDestinations(); + if (val != null) { + cf.setCacheDestinations(val); + } + Integer val2 = overrideProperties.getConsumerMaxRate() != null ? overrideProperties.getConsumerMaxRate() : raProperties.getConsumerMaxRate(); if (val2 != null) { cf.setConsumerMaxRate(val2); diff --git a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ConnectionFactoryProperties.java b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ConnectionFactoryProperties.java index 442952fb37..ba0484aa78 100644 --- a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ConnectionFactoryProperties.java +++ b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ConnectionFactoryProperties.java @@ -112,6 +112,8 @@ public class ConnectionFactoryProperties implements ConnectionFactoryOptions { private Boolean useGlobalPools; + private Boolean cacheDestinations; + private Integer initialMessagePacketSize; private Integer scheduledThreadPoolMaxSize; @@ -612,6 +614,21 @@ public class ConnectionFactoryProperties implements ConnectionFactoryOptions { this.useGlobalPools = useGlobalPools; } + public Boolean isCacheDestinations() { + if (ConnectionFactoryProperties.trace) { + ActiveMQRALogger.LOGGER.trace("isCacheDestinations()"); + } + return cacheDestinations; + } + + public void setCacheDestinations(final Boolean cacheDestinations) { + if (ConnectionFactoryProperties.trace) { + ActiveMQRALogger.LOGGER.trace("setCacheDestinations(" + cacheDestinations + ")"); + } + hasBeenUpdated = true; + this.cacheDestinations = cacheDestinations; + } + public Integer getScheduledThreadPoolMaxSize() { if (ConnectionFactoryProperties.trace) { ActiveMQRALogger.LOGGER.trace("getScheduledThreadPoolMaxSize()"); diff --git a/docs/user-manual/en/perf-tuning.md b/docs/user-manual/en/perf-tuning.md index f131a151a1..010c19feb4 100644 --- a/docs/user-manual/en/perf-tuning.md +++ b/docs/user-manual/en/perf-tuning.md @@ -150,6 +150,11 @@ tuning: java.lang.String does not require copying before it is written to the wire, so if you re-use `SimpleString` instances between calls then you can avoid some unnecessary copying. + +- If using frameworks like Spring, configure destinations permanently broker side + and enable `destinationCache` on the client side. + See the [Setting The Destination Cache](using-jms.md) + for more information on this. ## Tuning Transport Settings diff --git a/docs/user-manual/en/using-jms.md b/docs/user-manual/en/using-jms.md index d921cafd79..6646293210 100644 --- a/docs/user-manual/en/using-jms.md +++ b/docs/user-manual/en/using-jms.md @@ -377,3 +377,14 @@ consumer to send acknowledgements in batches rather than individually saving valuable bandwidth. This can be configured on the connection factory via the `transactionBatchSize` element and is set in bytes. The default is 1024 \* 1024. + +### Setting The Destination Cache + +Many frameworks such as Spring resolve the destination by name on every operation, +this can cause a performance issue and extra calls to the broker, +in a scenario where destinations (addresses) are permanent broker side, +such as they are managed by a platform or operations team. +using `destinationCache` element, you can toggle on the destination cache +to improve the performance and reduce the calls to the broker. +This should not be used if destinations (addresses) are not permanent broker side, +as in dynamic creation/deletion. diff --git a/examples/features/sub-modules/artemis-ra-rar/src/main/resources/ra.xml b/examples/features/sub-modules/artemis-ra-rar/src/main/resources/ra.xml index db571a33be..6292ee6829 100644 --- a/examples/features/sub-modules/artemis-ra-rar/src/main/resources/ra.xml +++ b/examples/features/sub-modules/artemis-ra-rar/src/main/resources/ra.xml @@ -258,6 +258,12 @@ PasswordCodec java.lang.String org.apache.activemq.artemis.utils.DefaultSensitiveStringCodec;key=clusterpassword;algorithm=something + + + Cache destinations per session + CacheDestinations + java.lang.Boolean + false --> diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/ra/ActiveMQResourceAdapterConfigTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/ra/ActiveMQResourceAdapterConfigTest.java index cd051fe6c2..e4dd15cab9 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/ra/ActiveMQResourceAdapterConfigTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/ra/ActiveMQResourceAdapterConfigTest.java @@ -253,6 +253,12 @@ public class ActiveMQResourceAdapterConfigTest extends ActiveMQTestBase { " boolean\n" + " \n" + " \n" + + " " + + " Cache destinations per session" + + " CacheDestinations" + + " boolean" + + " " + + " " + " \n" + " max number of threads for scheduled thread pool\n" + " ScheduledThreadPoolMaxSize\n" +