ARTEMIS-1179: Add Optional Client JMS Destination Cache
Add topic and queue cache maps in Session. Add configuration to use cache or not with defaulting to false, which keeps existing behaviour as the default.
This commit is contained in:
parent
9ff301dc38
commit
373803a85d
|
@ -126,6 +126,8 @@ public class ActiveMQConnection extends ActiveMQConnectionForContextImpl impleme
|
|||
|
||||
private final int transactionBatchSize;
|
||||
|
||||
private final boolean cacheDestinations;
|
||||
|
||||
private ClientSession initialSession;
|
||||
|
||||
private final Exception creationStack;
|
||||
|
@ -143,6 +145,7 @@ public class ActiveMQConnection extends ActiveMQConnectionForContextImpl impleme
|
|||
final String clientID,
|
||||
final int dupsOKBatchSize,
|
||||
final int transactionBatchSize,
|
||||
final boolean cacheDestinations,
|
||||
final ClientSessionFactory sessionFactory) {
|
||||
this.options = options;
|
||||
|
||||
|
@ -164,6 +167,8 @@ public class ActiveMQConnection extends ActiveMQConnectionForContextImpl impleme
|
|||
|
||||
this.transactionBatchSize = transactionBatchSize;
|
||||
|
||||
this.cacheDestinations = cacheDestinations;
|
||||
|
||||
creationStack = new Exception();
|
||||
}
|
||||
|
||||
|
@ -654,9 +659,9 @@ public class ActiveMQConnection extends ActiveMQConnectionForContextImpl impleme
|
|||
ClientSession session,
|
||||
int type) {
|
||||
if (isXA) {
|
||||
return new ActiveMQXASession(options, this, transacted, true, acknowledgeMode, session, type);
|
||||
return new ActiveMQXASession(options, this, transacted, true, acknowledgeMode, cacheDestinations, session, type);
|
||||
} else {
|
||||
return new ActiveMQSession(options, this, transacted, false, acknowledgeMode, session, type);
|
||||
return new ActiveMQSession(options, this, transacted, false, acknowledgeMode, cacheDestinations, session, type);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -84,6 +84,8 @@ public class ActiveMQConnectionFactory implements ConnectionFactoryOptions, Exte
|
|||
|
||||
private String deserializationWhiteList;
|
||||
|
||||
private boolean cacheDestinations;
|
||||
|
||||
private boolean finalizeChecks;
|
||||
|
||||
@Override
|
||||
|
@ -428,6 +430,15 @@ public class ActiveMQConnectionFactory implements ConnectionFactoryOptions, Exte
|
|||
this.transactionBatchSize = transactionBatchSize;
|
||||
}
|
||||
|
||||
public synchronized boolean isCacheDestinations() {
|
||||
return this.cacheDestinations;
|
||||
}
|
||||
|
||||
public synchronized void setCacheDestinations(final boolean cacheDestinations) {
|
||||
checkWrite();
|
||||
this.cacheDestinations = cacheDestinations;
|
||||
}
|
||||
|
||||
public synchronized long getClientFailureCheckPeriod() {
|
||||
return serverLocator.getClientFailureCheckPeriod();
|
||||
}
|
||||
|
@ -766,19 +777,19 @@ public class ActiveMQConnectionFactory implements ConnectionFactoryOptions, Exte
|
|||
|
||||
if (isXA) {
|
||||
if (type == ActiveMQConnection.TYPE_GENERIC_CONNECTION) {
|
||||
connection = new ActiveMQXAConnection(this, username, password, type, clientID, dupsOKBatchSize, transactionBatchSize, factory);
|
||||
connection = new ActiveMQXAConnection(this, username, password, type, clientID, dupsOKBatchSize, transactionBatchSize, cacheDestinations, factory);
|
||||
} else if (type == ActiveMQConnection.TYPE_QUEUE_CONNECTION) {
|
||||
connection = new ActiveMQXAConnection(this, username, password, type, clientID, dupsOKBatchSize, transactionBatchSize, factory);
|
||||
connection = new ActiveMQXAConnection(this, username, password, type, clientID, dupsOKBatchSize, transactionBatchSize, cacheDestinations, factory);
|
||||
} else if (type == ActiveMQConnection.TYPE_TOPIC_CONNECTION) {
|
||||
connection = new ActiveMQXAConnection(this, username, password, type, clientID, dupsOKBatchSize, transactionBatchSize, factory);
|
||||
connection = new ActiveMQXAConnection(this, username, password, type, clientID, dupsOKBatchSize, transactionBatchSize, cacheDestinations, factory);
|
||||
}
|
||||
} else {
|
||||
if (type == ActiveMQConnection.TYPE_GENERIC_CONNECTION) {
|
||||
connection = new ActiveMQConnection(this, username, password, type, clientID, dupsOKBatchSize, transactionBatchSize, factory);
|
||||
connection = new ActiveMQConnection(this, username, password, type, clientID, dupsOKBatchSize, transactionBatchSize, cacheDestinations, factory);
|
||||
} else if (type == ActiveMQConnection.TYPE_QUEUE_CONNECTION) {
|
||||
connection = new ActiveMQConnection(this, username, password, type, clientID, dupsOKBatchSize, transactionBatchSize, factory);
|
||||
connection = new ActiveMQConnection(this, username, password, type, clientID, dupsOKBatchSize, transactionBatchSize, cacheDestinations, factory);
|
||||
} else if (type == ActiveMQConnection.TYPE_TOPIC_CONNECTION) {
|
||||
connection = new ActiveMQConnection(this, username, password, type, clientID, dupsOKBatchSize, transactionBatchSize, factory);
|
||||
connection = new ActiveMQConnection(this, username, password, type, clientID, dupsOKBatchSize, transactionBatchSize, cacheDestinations, factory);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -45,8 +45,10 @@ import javax.jms.TransactionInProgressException;
|
|||
import javax.transaction.xa.XAResource;
|
||||
import java.io.Serializable;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
|
||||
|
@ -95,6 +97,12 @@ public class ActiveMQSession implements QueueSession, TopicSession {
|
|||
|
||||
private final Set<ActiveMQMessageConsumer> consumers = new HashSet<>();
|
||||
|
||||
private final boolean cacheDestination;
|
||||
|
||||
private final Map<String, Topic> topicCache = new ConcurrentHashMap<>();
|
||||
|
||||
private final Map<String, Queue> queueCache = new ConcurrentHashMap<>();
|
||||
|
||||
// Constructors --------------------------------------------------
|
||||
|
||||
protected ActiveMQSession(final ConnectionFactoryOptions options,
|
||||
|
@ -102,6 +110,7 @@ public class ActiveMQSession implements QueueSession, TopicSession {
|
|||
final boolean transacted,
|
||||
final boolean xa,
|
||||
final int ackMode,
|
||||
final boolean cacheDestination,
|
||||
final ClientSession session,
|
||||
final int sessionType) {
|
||||
this.options = options;
|
||||
|
@ -117,6 +126,8 @@ public class ActiveMQSession implements QueueSession, TopicSession {
|
|||
this.transacted = transacted;
|
||||
|
||||
this.xa = xa;
|
||||
|
||||
this.cacheDestination = cacheDestination;
|
||||
}
|
||||
|
||||
// Session implementation ----------------------------------------
|
||||
|
@ -255,6 +266,8 @@ public class ActiveMQSession implements QueueSession, TopicSession {
|
|||
throw JMSExceptionHelper.convertFromActiveMQException(e);
|
||||
}
|
||||
}
|
||||
topicCache.clear();
|
||||
queueCache.clear();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -367,7 +380,17 @@ public class ActiveMQSession implements QueueSession, TopicSession {
|
|||
}
|
||||
|
||||
try {
|
||||
return internalCreateQueue(queueName, false);
|
||||
Queue queue = null;
|
||||
if (cacheDestination) {
|
||||
queue = queueCache.get(queueName);
|
||||
}
|
||||
if (queue == null) {
|
||||
queue = internalCreateQueue(queueName, false);
|
||||
}
|
||||
if (cacheDestination) {
|
||||
queueCache.put(queueName, queue);
|
||||
}
|
||||
return queue;
|
||||
} catch (ActiveMQException e) {
|
||||
throw JMSExceptionHelper.convertFromActiveMQException(e);
|
||||
}
|
||||
|
@ -396,9 +419,18 @@ public class ActiveMQSession implements QueueSession, TopicSession {
|
|||
if (sessionType == ActiveMQSession.TYPE_QUEUE_SESSION) {
|
||||
throw new IllegalStateException("Cannot create a topic on a QueueSession");
|
||||
}
|
||||
|
||||
try {
|
||||
return internalCreateTopic(topicName, false);
|
||||
Topic topic = null;
|
||||
if (cacheDestination) {
|
||||
topic = topicCache.get(topicName);
|
||||
}
|
||||
if (topic == null) {
|
||||
topic = internalCreateTopic(topicName, false);
|
||||
}
|
||||
if (cacheDestination) {
|
||||
topicCache.put(topicName, topic);
|
||||
}
|
||||
return topic;
|
||||
} catch (ActiveMQException e) {
|
||||
throw JMSExceptionHelper.convertFromActiveMQException(e);
|
||||
}
|
||||
|
|
|
@ -41,8 +41,9 @@ public final class ActiveMQXAConnection extends ActiveMQConnection implements XA
|
|||
final String clientID,
|
||||
final int dupsOKBatchSize,
|
||||
final int transactionBatchSize,
|
||||
final boolean cacheDestinations,
|
||||
final ClientSessionFactory sessionFactory) {
|
||||
super(options, username, password, connectionType, clientID, dupsOKBatchSize, transactionBatchSize, sessionFactory);
|
||||
super(options, username, password, connectionType, clientID, dupsOKBatchSize, transactionBatchSize, cacheDestinations, sessionFactory);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -36,8 +36,9 @@ public class ActiveMQXASession extends ActiveMQSession implements XAQueueSession
|
|||
boolean transacted,
|
||||
boolean xa,
|
||||
int ackMode,
|
||||
boolean cacheDestinations,
|
||||
ClientSession session,
|
||||
int sessionType) {
|
||||
super(options, connection, transacted, xa, ackMode, session, sessionType);
|
||||
super(options, connection, transacted, xa, ackMode, cacheDestinations, session, sessionType);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -400,6 +400,18 @@ public class ConnectionFactoryURITest {
|
|||
checkEquals(bean, connectionFactoryWithHA, factory);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCacheDestinations() throws Exception {
|
||||
ActiveMQConnectionFactory factory = parser.newObject(new URI("tcp://localhost:3030"), null);
|
||||
|
||||
Assert.assertFalse(factory.isCacheDestinations());
|
||||
|
||||
factory = parser.newObject(new URI("tcp://localhost:3030?cacheDestinations=true"), null);
|
||||
|
||||
Assert.assertTrue(factory.isCacheDestinations());
|
||||
|
||||
}
|
||||
|
||||
private void populate(StringBuilder sb,
|
||||
BeanUtilsBean bean,
|
||||
ActiveMQConnectionFactory factory) throws IllegalAccessException, InvocationTargetException {
|
||||
|
|
|
@ -580,6 +580,14 @@ public final class ActiveMQRAManagedConnectionFactory implements ManagedConnecti
|
|||
mcfProperties.setUseGlobalPools(useGlobalPools);
|
||||
}
|
||||
|
||||
public Boolean isCacheDestinations() {
|
||||
return mcfProperties.isCacheDestinations();
|
||||
}
|
||||
|
||||
public void setCacheDestinations(final Boolean cacheDestinations) {
|
||||
mcfProperties.setCacheDestinations(cacheDestinations);
|
||||
}
|
||||
|
||||
public Integer getScheduledThreadPoolMaxSize() {
|
||||
return mcfProperties.getScheduledThreadPoolMaxSize();
|
||||
}
|
||||
|
|
|
@ -634,6 +634,32 @@ public class ActiveMQResourceAdapter implements ResourceAdapter, Serializable {
|
|||
return raProperties.isFailoverOnInitialConnection();
|
||||
}
|
||||
|
||||
/**
|
||||
* Set cacheDestinations
|
||||
*
|
||||
* @param cacheDestinations The value
|
||||
*/
|
||||
public void setCacheDestinations(final Boolean cacheDestinations) {
|
||||
if (ActiveMQResourceAdapter.trace) {
|
||||
ActiveMQRALogger.LOGGER.trace("setCacheDestinations(" + cacheDestinations + ")");
|
||||
}
|
||||
|
||||
raProperties.setCacheDestinations(cacheDestinations);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get isCacheDestinations
|
||||
*
|
||||
* @return The value
|
||||
*/
|
||||
public Boolean isCacheDestinations() {
|
||||
if (ActiveMQResourceAdapter.trace) {
|
||||
ActiveMQRALogger.LOGGER.trace("isCacheDestinations()");
|
||||
}
|
||||
|
||||
return raProperties.isCacheDestinations();
|
||||
}
|
||||
|
||||
/**
|
||||
* Set compressLargeMessage
|
||||
*
|
||||
|
@ -1911,6 +1937,11 @@ public class ActiveMQResourceAdapter implements ResourceAdapter, Serializable {
|
|||
cf.setFailoverOnInitialConnection(val);
|
||||
}
|
||||
|
||||
val = overrideProperties.isCacheDestinations() != null ? overrideProperties.isCacheDestinations() : raProperties.isCacheDestinations();
|
||||
if (val != null) {
|
||||
cf.setCacheDestinations(val);
|
||||
}
|
||||
|
||||
Integer val2 = overrideProperties.getConsumerMaxRate() != null ? overrideProperties.getConsumerMaxRate() : raProperties.getConsumerMaxRate();
|
||||
if (val2 != null) {
|
||||
cf.setConsumerMaxRate(val2);
|
||||
|
|
|
@ -112,6 +112,8 @@ public class ConnectionFactoryProperties implements ConnectionFactoryOptions {
|
|||
|
||||
private Boolean useGlobalPools;
|
||||
|
||||
private Boolean cacheDestinations;
|
||||
|
||||
private Integer initialMessagePacketSize;
|
||||
|
||||
private Integer scheduledThreadPoolMaxSize;
|
||||
|
@ -612,6 +614,21 @@ public class ConnectionFactoryProperties implements ConnectionFactoryOptions {
|
|||
this.useGlobalPools = useGlobalPools;
|
||||
}
|
||||
|
||||
public Boolean isCacheDestinations() {
|
||||
if (ConnectionFactoryProperties.trace) {
|
||||
ActiveMQRALogger.LOGGER.trace("isCacheDestinations()");
|
||||
}
|
||||
return cacheDestinations;
|
||||
}
|
||||
|
||||
public void setCacheDestinations(final Boolean cacheDestinations) {
|
||||
if (ConnectionFactoryProperties.trace) {
|
||||
ActiveMQRALogger.LOGGER.trace("setCacheDestinations(" + cacheDestinations + ")");
|
||||
}
|
||||
hasBeenUpdated = true;
|
||||
this.cacheDestinations = cacheDestinations;
|
||||
}
|
||||
|
||||
public Integer getScheduledThreadPoolMaxSize() {
|
||||
if (ConnectionFactoryProperties.trace) {
|
||||
ActiveMQRALogger.LOGGER.trace("getScheduledThreadPoolMaxSize()");
|
||||
|
|
|
@ -151,6 +151,11 @@ tuning:
|
|||
the wire, so if you re-use `SimpleString` instances between calls
|
||||
then you can avoid some unnecessary copying.
|
||||
|
||||
- If using frameworks like Spring, configure destinations permanently broker side
|
||||
and enable `destinationCache` on the client side.
|
||||
See the [Setting The Destination Cache](using-jms.md)
|
||||
for more information on this.
|
||||
|
||||
## Tuning Transport Settings
|
||||
|
||||
- TCP buffer sizes. If you have a fast network and fast machines you
|
||||
|
|
|
@ -377,3 +377,14 @@ consumer to send acknowledgements in batches rather than individually
|
|||
saving valuable bandwidth. This can be configured on the connection
|
||||
factory via the `transactionBatchSize` element and is set in bytes.
|
||||
The default is 1024 \* 1024.
|
||||
|
||||
### Setting The Destination Cache
|
||||
|
||||
Many frameworks such as Spring resolve the destination by name on every operation,
|
||||
this can cause a performance issue and extra calls to the broker,
|
||||
in a scenario where destinations (addresses) are permanent broker side,
|
||||
such as they are managed by a platform or operations team.
|
||||
using `destinationCache` element, you can toggle on the destination cache
|
||||
to improve the performance and reduce the calls to the broker.
|
||||
This should not be used if destinations (addresses) are not permanent broker side,
|
||||
as in dynamic creation/deletion.
|
||||
|
|
|
@ -258,6 +258,12 @@
|
|||
<config-property-name>PasswordCodec</config-property-name>
|
||||
<config-property-type>java.lang.String</config-property-type>
|
||||
<config-property-value>org.apache.activemq.artemis.utils.DefaultSensitiveStringCodec;key=clusterpassword;algorithm=something</config-property-value>
|
||||
</config-property>
|
||||
<config-property>
|
||||
<description>Cache destinations per session</description>
|
||||
<config-property-name>CacheDestinations</config-property-name>
|
||||
<config-property-type>java.lang.Boolean</config-property-type>
|
||||
<config-property-value>false</config-property-value>
|
||||
</config-property>-->
|
||||
|
||||
<outbound-resourceadapter>
|
||||
|
|
|
@ -253,6 +253,12 @@ public class ActiveMQResourceAdapterConfigTest extends ActiveMQTestBase {
|
|||
" <config-property-type>boolean</config-property-type>\n" +
|
||||
" <config-property-value></config-property-value>\n" +
|
||||
" </config-property>\n" +
|
||||
" <config-property>" +
|
||||
" <description>Cache destinations per session</description>" +
|
||||
" <config-property-name>CacheDestinations</config-property-name>" +
|
||||
" <config-property-type>boolean</config-property-type>" +
|
||||
" <config-property-value></config-property-value>" +
|
||||
" </config-property>" +
|
||||
" <config-property>\n" +
|
||||
" <description>max number of threads for scheduled thread pool</description>\n" +
|
||||
" <config-property-name>ScheduledThreadPoolMaxSize</config-property-name>\n" +
|
||||
|
|
Loading…
Reference in New Issue